#include <unistd.h>
#include <signal.h>
#include <sys/select.h>
-#define WAIT_TIME 3
+#include "tlookup.h"
#endif
#define NUM_THREADS 1
int sizeOfHostArray;
int numHostsInSystem;
int myIndexInHostArray;
+int waitThreadMid;
+unsigned int waitThreadID;
+
unsigned int oidsPerBlock;
unsigned int oidMin;
unsigned int oidMax;
int bytesRecv = 0;
int totalObjSize = 0;
+#ifdef RECOVERY
/***********************************
* Global variables for Duplication
***********************************/
int numLiveHostsInSystem;
int flipBit; // Used to distribute requests between primary and backup evenly
unsigned int *locateObjHosts;
-__thread int timeoutFlag;
-extern int leaderFixing;
-extern pthread_mutex_t leaderFixing_mutex;
-extern pthread_mutex_t liveHosts_mutex;
+#endif
-unsigned int liveTransactions[25];
+int transRetryFlag;
unsigned int transIDMax;
unsigned int transIDMin;
unsigned int transIDIndex;
-#ifdef DEBUG
char ip[16];
-#endif
+#ifdef RECOVERY
/******************************
* Global variables for Paxos
******************************/
unsigned int origleader;
unsigned int temp_v_a;
int paxosRound;
+#endif
void printhex(unsigned char *, int);
plistnode_t *createPiles();
/*******************************
* Send and Recv function calls
*******************************/
-void send_data(int fd, void *buf, int buflen) {
-#ifdef DEBUG
-// printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
- char *buffer = (char *)(buf);
+int send_data(int fd, void *buf, int buflen) {
+ char *buffer = (char *)(buf);
int size = buflen;
int numbytes;
+
while (size > 0) {
- numbytes = send(fd, buffer, size, 0);
- bytesSent = bytesSent + numbytes;
+
+#ifdef GDBDEBUG
+GDBSEND1:
+#endif
+ numbytes = send(fd, buffer, size, 0);
+
+ if( numbytes>0) {
+ bytesSent += numbytes;
+ size -= numbytes;
+ }
#ifdef RECOVERY
-#ifdef DEBUG
-// printf("%s-> numbytes: %d\n", __func__, numbytes);
+ else if( numbytes < 0) {
+ // Receive returned an error.
+ // Analyze underlying cause
+#ifndef DEBUG
+ printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno));
+ fflush(stdout);
#endif
- if(errno == ECONNRESET) { // EINT/EPIPE??; Connection reset, possible disconnected machine
+ if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
+ // machine has failed
+ //
+ // if we see EAGAIN w/o failures, we should record the time
+ // when we start send and finish send see if it is longer
+ // than our threshold
+ //
#ifdef DEBUG
- printf("%s-> errno = ECONNRESET; connection reset\n", __func__);
- printf("***SETTING TIMEOUTFLAG***\n");
+ printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
#endif
- errno = 0;
- timeoutFlag = 1;
- return;
- }
- else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+ return -1;
+ } else {
+#ifdef GDBDEBUG
+ if(errno == 4)
+ goto GDBSEND1;
+#endif
+
+
#ifdef DEBUG
- printf("%s-> errno = EAGAIN|EWOULDBLOCK; socket timeout\n", __func__);
- printf("***SETTING TIMEOUTFLAG***\n");
+ printf("%s -> Unexpected ERROR!\n",__func__);
#endif
- errno = 0;
- timeoutFlag = 1;
- return;
- }
- else if(numbytes == -1) {
+ return -2;
+ }
+ }
+ else{
+ // Case : numbytes == 0
+ // // machine has failed -- this case probably doesn't occur in reality
+ //
+
+
+
#ifdef DEBUG
- printf("%s-> numbytes = -1; socket timeout\n", __func__);
- printf("***SETTING TIMEOUTFLAG***\n");
+ printf("%s -> SHOULD NOT BE HERE\n",__func__);
#endif
- timeoutFlag = 1;
- return;
- }
-#else
- if (numbytes == -1) {
- perror("send");
- exit(0);
- }
+ return -1;
+ }
#endif
- buffer += numbytes;
- size -= numbytes;
- }
+ } // close while loop
#ifdef DEBUG
-// printf("%s-> Exiting\n", __func__);
+ printf("%s-> Exiting\n", __func__);
#endif
+
+ return 0; // completed sending data
}
-void recv_data(int fd, void *buf, int buflen) {
-#ifdef DEBUG
-// printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
- char *buffer = (char *)(buf);
- int size = buflen;
+//Returns negative value if receive cannot be completed because of
+//timeout or machine failure
+
+int recv_data(int fd, void *buf, int buflen) {
+ char *buffer = (char *)(buf);
+ int size = buflen;
int numbytes;
- while (size > 0) {
- numbytes = recv(fd, buffer, size, 0);
- bytesRecv = bytesRecv + numbytes;
+ int trycounter = 0;
+
+ while (size > 0) {
+#ifdef GDBDEBUG
+GDBRECV1:
+#endif
+ numbytes = recv(fd, buffer, size, 0);
+
+ if (numbytes>0) {
+ buffer += numbytes;
+ size -= numbytes;
+ }
#ifdef RECOVERY
-#ifdef DEBUG
-// printf("%s-> numbytes: %d\n", __func__, numbytes);
-#endif
- if(errno == ECONNRESET) {
-#ifdef DEBUG
- printf("%s-> errno = ECONNRESET; connection reset\n", __func__);
- printf("***SETTING TIMEOUTFLAG***\n");
+ else if (numbytes<0){
+ //Receive returned an error.
+ //Analyze underlying cause
+#ifdef DEBUG
+ printf("%s-> fd : %d errno = %d %s\n", __func__, fd, errno, strerror(errno));
+#endif
+ if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
+ //machine has failed
+ //if we see EAGAIN w/o failures, we should record the time
+ //when we start read and finish read and see if it is longer
+ //than our threshold
+#ifdef DEBUG
+ printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
+#endif
+ if(errno == EAGAIN) {
+ if(trycounter < 5) {
+#ifndef DEBUG
+ printf("%s -> TRYcounter increases\n",__func__);
+#endif
+ trycounter++;
+ continue;
+ }
+ else
+ return -1;
+ }
+ return -1;
+ } else {
+#ifdef GDBDEBUG
+ if(errno == 4)
+ goto GDBRECV1;
#endif
- errno = 0;
- timeoutFlag = 1;
- return;
- }
- else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+
#ifdef DEBUG
- printf("%s-> errno = EAGAIN|EWOULDBLOCK; socket timeout\n", __func__);
- printf("***SETTING TIMEOUTFLAG***\n");
+ printf("%s -> Unexpected ERROR!\n",__func__);
+ printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
#endif
- errno = 0;
- timeoutFlag = 1;
- return;
- }
- else if(numbytes == -1) {
+ return -2;
+ }
+ } else {
+ //Case: numbytes==0
+ //machine has failed -- this case probably doesn't occur in reality
+ //
#ifdef DEBUG
- printf("%s-> numbytes = -1; socket timeout\n", __func__);
- printf("***SETTING TIMEOUTFLAG***\n");
+ printf("%s -> SHOULD NOT BE HERE\n",__func__);
#endif
- timeoutFlag = 1;
- return;
- }
-#else
- if (numbytes == -1) {
- perror("recv");
- exit(0);
+ return -1;
}
#endif
- buffer += numbytes;
- size -= numbytes;
- }
-#ifdef DEBUG
-// printf("%s-> Exiting\n", __func__);
-#endif
-}
-
-void recv_data_block(int fd, void *buf, int buflen) {
-#ifdef DEBUG
- printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
- char *buffer = (char *)(buf);
- int size = buflen;
- int numbytes;
- while (size > 0) {
- numbytes = recv(fd, buffer, size, 0);
-#ifdef DEBUG
- printf("%s-> numbytes: %d\n", __func__, numbytes);
-#endif
- if(errno == EAGAIN || errno == EWOULDBLOCK) {
- errno = 0;
- }
- if(numbytes != -1) {
- bytesRecv = bytesRecv + numbytes;
- buffer += numbytes;
- size -= numbytes;
- }
- }
+ } //close while loop
#ifdef DEBUG
- printf("%s-> Exiting\n", __func__);
+ printf("%s -> fd = %d Exiting\n",__func__,fd);
#endif
+ return 0; // got all the data
}
int recv_data_errorcode(int fd, void *buf, int buflen) {
#ifdef DEBUG
- printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
+ printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
#endif
char *buffer = (char *)(buf);
int size = buflen;
- int numbytes;
- while (size > 0) {
- numbytes = recv(fd, buffer, size, 0);
+ int numbytes;
+ while (size > 0) {
+ numbytes = recv(fd, buffer, size, 0);
#ifdef DEBUG
- printf("%s-> numbytes: %d\n", __func__, numbytes);
+ printf("%s-> numbytes: %d\n", __func__, numbytes);
#endif
- if (numbytes==0)
- return 0;
- else if (numbytes == -1) {
- perror("recv_data_errorcode");
- return -1;
- }
- buffer += numbytes;
- size -= numbytes;
- }
+ if (numbytes==0)
+ return 0;
+ else if (numbytes == -1) {
+ printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno));
+ perror("recv_data_errorcode");
+ return -1;
+ }
+
+ buffer += numbytes;
+ size -= numbytes;
+ }
#ifdef DEBUG
- printf("%s-> Exiting\n", __func__);
+ printf("%s-> Exiting\n", __func__);
#endif
- return 1;
+ return 1;
}
void printhex(unsigned char *ptr, int numBytes) {
updateLiveHosts();
setLocateObjHosts();
updateLiveHostsCommit();
- leader = paxos();
+ paxos();
if(!allHostsLive()) {
printf("Not all hosts live. Exiting.\n");
exit(-1);
#endif
}
-// Search for an address for a given oid
/*#define INLINE inline __attribute__((always_inline))
INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
addtransaction(oid);
#endif
- if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+ if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
#ifdef DEBUG
- printf("%s-> Grab from this machine\n", __func__);
+ printf("%s-> Grab from this machine\n", __func__);
#endif
#ifdef TRANSSTATS
- nmhashSearch++;
+ nmhashSearch++;
#endif
- /* Look up in machine lookup table and copy into cache*/
- GETSIZE(size, objheader);
- size += sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
- memcpy(objcopy, objheader, size);
- /* Insert into cache's lookup table */
- STATUS(objcopy)=0;
- t_chashInsert(OID(objheader), objcopy);
+ /* Look up in machine lookup table and copy into cache*/
+ GETSIZE(size, objheader);
+ size += sizeof(objheader_t);
+ objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+ memcpy(objcopy, objheader, size);
+ /* Insert into cache's lookup table */
+ STATUS(objcopy)=0;
+ t_chashInsert(OID(objheader), objcopy);
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
#endif
- } else {
+ } else {
#ifdef CACHE
- , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
#ifdef TRANSSTATS
nprehashSearch++;
#endif
#else
return objcopy;
#endif
- }
+ }
#endif
/* Get the object from the remote location */
+
#ifdef DEBUG
- printf("%s-> Grab from remote machine\n", __func__);
+ printf("%s-> Grab from remote machine\n", __func__);
#endif
#ifdef RECOVERY
- //while(!liveHostsValid) {
- //}
- /*if(!liveHostsValid){
- sleep(WAIT_TIME);
- }*/
- unsigned int mindex = findHost(lhashSearch(oid));
- machinenumber = locateObjHosts[2*mindex+flipBit];
- flipBit ^= 1;
- printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
+ transRetryFlag = 0;
+ unsigned int mindex = findHost(lhashSearch(oid));
+ machinenumber = locateObjHosts[2*mindex+flipBit];
+
+ if(numLiveHostsInSystem > 1)
+ flipBit ^= 1;
+ else
+ flipBit = 0;
+
+#ifdef DEBUG
+ printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
+#endif
#else
- if((machinenumber = lhashSearch(oid)) == 0) {
- printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
- return NULL;
- }
+ if((machinenumber = lhashSearch(oid)) == 0) {
+ printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+ return NULL;
+ }
+#endif
+ objcopy = getRemoteObj(machinenumber, oid);
+#ifdef RECOVERY
+ if(transRetryFlag) {
+ restoreDuplicationState(machinenumber);
+#ifdef DEBUG
+ printf("%s -> Recall transRead2\n",__func__);
+#endif
+ return transRead2(oid);
+ }
#endif
- objcopy = getRemoteObj(machinenumber, oid);
+ }
- if(objcopy == NULL) {
- printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
- return NULL;
- } else {
+ if(objcopy == NULL) {
+ printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ } else {
#ifdef TRANSSTATS
- nRemoteSend++;
+ nRemoteSend++;
#endif
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
+#endif
+ }
+#ifdef DEBUG
+ printf("%s -> Finished!!\n",__func__);
#endif
- }
- }
}
/* This function creates objects in the transaction record */
unsigned int size = c_size;
for(i = 0; i < size ; i++) {
- chashlistnode_t * curr = &ptr[i];
+ chashlistnode_t * curr = &ptr[i];
/* Inner loop to traverse the linked list of the cache lookupTable */
- while(curr != NULL) {
- //if the first bin in hash table is empty
- if(curr->key == 0)
- break;
- headeraddr=(objheader_t *) curr->val;
+ while(curr != NULL) {
+ //if the first bin in hash table is empty
+ if(curr->key == 0)
+ break;
+ headeraddr=(objheader_t *) curr->val;
#if RECOVERY
- oid = OID(headeraddr);
+ oid = OID(headeraddr);
#ifdef DEBUG
printf("%s-> oid:%u, version:%d, status:%d, type:%d\n", __func__, OID(headeraddr), headeraddr->version, STATUS(headeraddr), TYPE(headeraddr));
if (STATUS(headeraddr) & NEW) { // new/local object
printf("%s-> new/local object\n", __func__);
}
- else if ((mhashSearch(curr->key) != NULL)) { //local/nonnew
- if(STATUS(headeraddr) & DIRTY) { // modified
+ else if ((mhashSearch(curr->key) != NULL)) { //local/nonnew
+ if(STATUS(headeraddr) & DIRTY) { // modified
printf("%s-> old/local/mod object\n", __func__);
}
else { //read
printf("%s-> old/local/read object\n", __func__);
}
- } else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
+ }
+ else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
if(STATUS(headeraddr) & DIRTY) { //modified
printf("%s-> remote/local/mod object\n", __func__);
}
else { //read
printf("%s-> remote/local/read object\n", __func__);
}
- } else {
+ }
+ else {
printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
return NULL;
}
if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
machinenum = myIpAddr;
} else if ((machinenum = lhashSearch(curr->key)) == 0) {
- printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
-
- //Make machine groups
- pile = pInsert(pile, headeraddr, machinenum, c_numelements);
+ printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ //Make machine groups
+ pile = pInsert(pile, headeraddr, machinenum, c_numelements);
#endif
- curr = curr->next;
- }
- }
+ curr = curr->next;
+ }
+ }
return pile;
}
#else
int transCommit() {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
- int trecvcount;
char treplyretry; /* keeps track of the common response that needs to be sent */
int firsttime=1;
trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
char finalResponse;
+ int deadsd = -1;
+ int deadmid = -1;
+ unsigned int transID = getNewTransID();
- int tmpTransIndex = (transIDIndex++)%25;
- liveTransactions[tmpTransIndex] = getNewTransID();
-
#ifdef DEBUG
- printf("%s-> Start, transID:%d\n", __func__, liveTransactions[tmpTransIndex]);
+ printf("%s -> Starts transCommit\n",__func__);
#endif
#ifdef ABORTREADERS
t_chashDelete();
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 1;
}
#endif
-
do {
- trecvcount = 0;
treplyretry = 0;
/* Look through all the objects in the transaction record and make piles
tosend[sockindex].oidcreated = pile->oidcreated;
int sd = 0;
if(pile->mid != myIpAddr) {
-#ifdef RECOVERY
if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
-#else
- if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
-#endif
printf("\ntransRequest(): socket create error\n");
free(listmid);
free(tosend);
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 1;
}
socklist[sockindex] = sd;
/* Send bytes of data with TRANS_REQUEST control message */
send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
- /*if(timeoutFlag) {
- printf("send_data: remote machine dead, line:%d\n", __LINE__);
- timeoutFlag = 0;
- exit(1);
- }*/
+
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
free(tosend);
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 1;
}
free(tosend);
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 1;
}
offset+=size;
}
send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+
+#ifdef RECOVERY
+ /* send transaction id, number of machine involved, machine ids */
+ send_data(sd, &transID, sizeof(unsigned int));
+#endif
free(modptr);
} else { //handle request locally
localReqsock = sockindex;
printf("%s-> Finished sending transaction read/mod objects\n",__func__);
#endif
int i;
+
for(i = 0; i < pilecount; i++) {
- printf("i:%d\n", i);
if(i == localReqsock)
continue;
int sd = socklist[i];
if(sd != 0) {
char control;
- recv_data(sd, &control, sizeof(char));
- /*if(timeoutFlag) {
- printf("recv_data: remote machine dead, timeoutFlag:%d, timeoutFlag:%d, line:%d\n", timeoutFlag, timeoutFlag, __LINE__);
- timeoutFlag = 0;
- exit(1);
- }*/
+ int timeout; // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
+ timeout = recv_data(sd, &control, sizeof(char));
//Update common data structure with new ctrl msg
getReplyCtrl[i] = control;
/* Recv Objects if participant sends TRANS_DISAGREE */
#ifdef CACHE
if(control == TRANS_DISAGREE) {
int length;
- recv_data(sd, &length, sizeof(int));
+ timeout = recv_data(sd, &length, sizeof(int));
void *newAddr;
pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
pthread_mutex_unlock(&prefetchcache_mutex);
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 1;
}
pthread_mutex_unlock(&prefetchcache_mutex);
- recv_data(sd, newAddr, length);
+ timeout = recv_data(sd, newAddr, length);
int offset = 0;
while(length != 0) {
unsigned int oidToPrefetch;
}
} //end of receiving objs
#endif
+
+#ifdef RECOVERY
+ if(timeout < 0) {
+#ifdef DEBUG
+ printf("%s -> TIMEOUT!!!!!!!\n",__func__);
+#endif
+
+ deadmid = listmid[i];
+ deadsd = sd;
+#ifdef DEBUG
+ printf("%s -> Dead Machine ID : %s\n",__func__,midtoIPString(deadmid));
+ printf("%s -> Dead SD : %d\n",__func__,sd);
+#endif
+ getReplyCtrl[i] = TRANS_DISAGREE;
+ }
+#endif
}
}
+
#ifdef DEBUG
printf("%s-> Decide final response now\n", __func__);
#endif
free(listmid);
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 1;
}
#ifdef DEBUG
printf("%s-> Final Response: %d\n", __func__, (int)finalResponse);
#endif
+
/* Send responses to all machines */
for(i = 0; i < pilecount; i++) {
int sd = socklist[i];
- if(sd != 0) {
+
+ if(sd != deadsd) {
+ if(sd != 0) {
#ifdef CACHE
- if(finalResponse == TRANS_COMMIT) {
- int retval;
- /* Update prefetch cache */
- if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
-#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+ if(finalResponse == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+#ifdef DEBUG
+ printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+#endif
+ return 1;
+ }
+
+ /* Invalidate objects in other machine cache */
+ if(tosend[i].f.nummod > 0) {
+ if((retval = invalidateObj(&(tosend[i]))) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+#ifdef DEBUG
+ printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+#endif
+ return 1;
+ }
+ }
+#ifdef ABORTREADERS
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
+ }
+#ifdef ABORTREADERS
+ else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ }
#endif
- return 1;
- }
-
-
- /* Invalidate objects in other machine cache */
- if(tosend[i].f.nummod > 0) {
- if((retval = invalidateObj(&(tosend[i]))) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
+#endif
+ send_data(sd,&finalResponse,sizeof(char));
#ifdef DEBUG
- printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+ printf("%s -> Decision Sent to %s\n",__func__,midtoIPString(listmid[i]));
#endif
+
+ } else {
+ /* Complete local processing */
#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
-#endif
- return 1;
- }
- }
-#ifdef ABORTREADERS
- removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
+ thashInsert(transID,finalResponse);
#endif
- }
+ doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+
#ifdef ABORTREADERS
- else if (!treplyretry) {
- removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
- }
-#endif
-#endif
- send_data(sd, &finalResponse, sizeof(char));
- } else {
- /* Complete local processing */
- doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+ if(finalResponse == TRANS_COMMIT) {
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ } else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ }
+#endif
+ }
+ } else {
#ifdef ABORTREADERS
- if(finalResponse == TRANS_COMMIT) {
- removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
- } else if (!treplyretry) {
- removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
- removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
- }
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
#endif
- }
+ }
}
-#ifdef RECOVERY
+
#ifdef DEBUG
printf("%s-> Free sockets\n", __func__);
#endif
freeSockWithLock(transRequestSockPool, listmid[i], socklist[i]);
}
}
-#endif
+
/* Free resources */
free(tosend);
free(listmid);
nSoftAbort++;
#endif
}
- /* Retry trans commit procedure during soft_abort case */
- } while (treplyretry);
+ } while (treplyretry && deadmid != -1);
if(finalResponse == TRANS_ABORT) {
- //printf("Aborting trans\n");
+
#ifdef TRANSSTATS
numTransAbort++;
#endif
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
#endif
#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
+ if(deadmid != -1) { /* if deadmid is greater than or equal to 0,
+ then there is dead machine. */
+#ifdef DEBUG
+ printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
+#endif
+ restoreDuplicationState(deadmid);
+#ifdef DEBUG
+ printf("%s -> Duplication completed\n",__func__);
+#endif
+ }
#endif
return TRANS_ABORT;
} else if(finalResponse == TRANS_COMMIT) {
t_chashDelete();
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 0;
} else {
printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
exit(-1);
}
#ifdef DEBUG
printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
- liveTransactions[tmpTransIndex] = 0;
#endif
return 0;
}
/* Condition to send TRANS_AGREE */
if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
+#ifdef DEBUG
+ printf("%s -> TRANS_AGREE\n",__func__);
+#endif
*getReplyCtrl = TRANS_AGREE;
}
/* Condition to send TRANS_SOFT_ABORT */
if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+#ifdef DEBUG
+ printf("%s -> TRANS_SOFT_ABORT\n",__func__);
+#endif
*getReplyCtrl = TRANS_SOFT_ABORT;
}
}
void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
+
if(finalResponse == TRANS_ABORT) {
if(transAbortProcess(transinfo) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
objheader_t *h;
void *objcopy = NULL;
- int sd = getSock2(transReadSockPool, mnum);
- char readrequest[sizeof(char)+sizeof(unsigned int)];
- readrequest[0] = READ_REQUEST;
- *((unsigned int *)(&readrequest[1])) = oid;
- send_data(sd, readrequest, sizeof(readrequest));
+ int sd;
+ int flag;
+ if((sd = getSock2(transReadSockPool, mnum)) != -1) {
+ char readrequest[sizeof(char)+sizeof(unsigned int)];
+ readrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&readrequest[1])) = oid;
+ send_data(sd, readrequest, sizeof(readrequest));
+ }
+ else {
+ printf("%s -> creating socket error\n",__func__);
+ }
+
/* Read response from the Participant */
- recv_data(sd, &control, sizeof(char));
+ if(recv_data(sd, &control, sizeof(char)) < 0) {
+ transRetryFlag = 1;
+ return NULL;
+ }
if (control==OBJECT_NOT_FOUND) {
objcopy = NULL;
} else if(control==OBJECT_FOUND) {
- /* Read object if found into local cache */
- recv_data(sd, &size, sizeof(int));
- objcopy = objstrAlloc(&t_cache, size);
- recv_data(sd, objcopy, size);
- STATUS(objcopy)=0;
- /* Insert into cache's lookup table */
- t_chashInsert(oid, objcopy);
-#ifdef TRANSSTATS
- totalObjSize += size;
-#endif
- }
+
+ /* Read object if found into local cache */
-#ifdef RECOVERY
- if( detectMachineFailure(mnum) ) { //check for timeouts
- printf("looking for oid:%d\n", oid);
- restoreDuplicationState(mnum); // suspect machine failure, restore state
+ if(recv_data(sd, &size, sizeof(int)) < 0) {
+ transRetryFlag = 1;
+ return NULL;
+ }
- objheader_t *temp;
- temp = transRead2(oid); // retry transRead
-#ifdef COMPILER
- temp -= 1; // return object w/ objheader
+ objcopy = objstrAlloc(&t_cache, size);
+
+ if(recv_data(sd, objcopy, size) < 0) {
+ transRetryFlag = 1;
+ return NULL;
+ }
+
+ STATUS(objcopy)=0;
+
+ /* Insert into cache's lookup table */
+ t_chashInsert(oid, objcopy);
+#ifdef TRANSSTATS
+ totalObjSize += size;
#endif
- return (void *)temp;
}
-#endif
return objcopy;
}
-int detectMachineFailure(unsigned int mid) {
- if(timeoutFlag == 1) {
+#ifdef RECOVERY
+/* ask machines if they received decision */
+char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *listmid)
+{
#ifdef DEBUG
- printf("%s-> Suspect machine failure: [%s]\n", __func__, midtoIPString(mid));
+ printf("%s -> Entering\n",__func__);
#endif
- timeoutFlag = 0;
- return 1;
- }
- else
- return 0;
+
+ int sd; // socket id
+ int i;
+ char response;
+
+ for(i = 0; i < nummid; i++) {
+ if((sd = getSock(transReadSockPool, listmid[i])) < 0) {
+ printf("%s -> socket Error!!\n");
+ }
+ else {
+ char control = ASK_COMMIT;
+
+ send_data(sd,&control, sizeof(char));
+ send_data(sd,&transID, sizeof(unsigned int));
+
+ // return -1 if it didn't receive the response
+ int timeout = recv_data(sd,&response, sizeof(char));
+
+
+ if(timeout == 0 || response > 0)
+ break; // received response
+
+ // else check next machine
+ freeSock(transReadSockPool, listmid[i],sd);
+ }
+ }
+#ifdef DEBUG
+ printf("%s -> response : %d\n",__func__,response);
+#endif
+
+ return (response==-1)?TRANS_ABORT:response;
}
+#endif
+#ifdef RECOVERY
void restoreDuplicationState(unsigned int deadHost) {
int sd;
char ctrl;
sleep(WAIT_TIME);
return;
}
+
if(deadHost == leader)
paxos();
#ifdef DEBUG
- printf("%s-> leader?:%s, me?:%d\n", __func__, midtoIPString(leader), (myIpAddr == leader));
+ printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER");
#endif
if(leader == myIpAddr) {
if(!leaderFixing) {
leaderFixing = 1;
pthread_mutex_unlock(&leaderFixing_mutex);
- //fixit
- updateLiveHosts();
- if(!liveHosts[findHost(deadHost)]) { //confirmed dead
- duplicateLostObjects(deadHost);
- }
- if(updateLiveHostsCommit() != 0) {
- printf("error updateLiveHostsCommit()\n");
- exit(1);
- }
- pthread_mutex_lock(&leaderFixing_mutex);
- leaderFixing = 0;
- pthread_mutex_unlock(&leaderFixing_mutex);
+ if(!liveHosts[findHost(deadHost)]) {
+#ifdef DEBUG
+ printf("%s -> already fixed\n",__func__);
+#endif
+ pthread_mutex_lock(&leaderFixing_mutex);
+ leaderFixing =0;
+ pthread_mutex_unlock(&leaderFixing_mutex);
+ }
+ else {
+ updateLiveHosts();
+ duplicateLostObjects(deadHost);
+
+ if(updateLiveHostsCommit() != 0) {
+ printf("%s -> error updateLiveHostsCommit()\n",__func__);
+ exit(1);
+ }
+ pthread_mutex_lock(&leaderFixing_mutex);
+ leaderFixing = 0;
+ pthread_mutex_unlock(&leaderFixing_mutex);
+ }
}
else {
pthread_mutex_unlock(&leaderFixing_mutex);
+#ifdef DEBUG
+ printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
+#endif
sleep(WAIT_TIME);
- //while(leaderFixing);
- return;
}
}
else {
- if((sd = getSock2WithLock(transRequestSockPool, leader)) < 0) {
- printf("restoreDuplicationState(): socket create error\n");
+ if((sd = getSockWithLock(transPrefetchSockPool, leader)) < 0) {
+ printf("%s -> socket create error\n",__func__);
exit(-1);
}
ctrl = REMOTE_RESTORE_DUPLICATED_STATE;
send_data(sd, &ctrl, sizeof(char));
send_data(sd, &deadHost, sizeof(unsigned int));
- recv_data(sd, &ctrl, sizeof(char));
+ freeSockWithLock(transPrefetchSockPool,leader,sd);
sleep(WAIT_TIME);
- return;
}
+
+ printf("%s -> Finished!\n",__func__);
}
+#endif
+
/* Commit info for objects modified */
void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
numlocked = transinfo->numlocked;
oidlocked = transinfo->objlocked;
-
#ifdef DEBUG
printf("%s-> nummod: %d, numcreated: %d, numlocked: %d\n", __func__, nummod, numcreated, numlocked);
#endif
header->version += 1;
//printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
if(header->notifylist != NULL) {
+#ifdef DEBUG
+ printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
+#endif
+#ifdef RECOVERY
+ if(header->isBackup != 0)
+ notifyAll(&header->notifylist, OID(header), header->version);
+ else
+ clearNotifyList(OID(header));
+#else
notifyAll(&header->notifylist, OID(header), header->version);
+#endif
}
}
/* If object is newly created inside transaction then commit it */
char *token;
const char *delimiters = " \t\n";
char *commentBegin;
+ unsigned int i;
in_addr_t tmpAddr;
configFile = fopen(CONFIG_FILENAME, "r");
#ifdef RECOVERY
liveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
locateObjHosts = calloc(sizeOfHostArray*2, sizeof(unsigned int));
+
liveHostsValid = 0;
#endif
transIDMin = oidMin;
transIDMax = oidMax;
+ waitThreadID = -1;
+ waitThreadMid = -1;
+
return 0;
}
+#ifdef RECOVERY
unsigned int getDuplicatedPrimaryMachine(unsigned int mid) {
int i;
for(i = 0; i < numHostsInSystem; i++) {
return bmid;
}
+int getStatus(int mid) {
+#ifdef DEBUG
+ printf("%s -> host %s : %s\n",__func__,midtoIPString(hostIpAddrs[mid]),(liveHosts[mid] == 1)?"LIVE":"DEAD");
+#endif
+ return liveHosts[mid];
+}
+#endif
+
+#ifdef RECOVERY
// updates the leader's liveHostArray and locateObj
-void updateLiveHosts() {
+unsigned int updateLiveHosts() {
#ifdef DEBUG
- printf("%s-> Entering updateLiveHosts\n", __func__);
+ printf("%s-> Entering updateLiveHosts\n", __func__);
#endif
// update everyone's list
- liveHostsValid = 0;
- //int *tmpLiveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
- //foreach in hostipaddrs, ping -> update list of livemachines
- //socket connection?
-
- //liveHosts lock here
- int sd = 0, i, j, tmpNumLiveHosts = 0;
- for(i = 0; i < numHostsInSystem; i++) {
- if(i == myIndexInHostArray)
- {
- tmpNumLiveHosts++;
- continue;
- }
- for(j = 0; j < 5; j++) { // hard define num of retries
- if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
- printf("updateLiveHosts(): Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
- usleep(1000);
- if(j == 4)
- liveHosts[i] = 0;
- continue;
- }
- char liverequest[sizeof(char)];
- liverequest[0] = RESPOND_LIVE;
-
- send_data(sd, &liverequest[0], sizeof(liverequest));
- char response = 0;
- recv_data(sd, &response, sizeof(response));
-
- //try to send msg
- //if timeout, dead host
- printf("YES received %d\n", response);
- if(response == LIVE) {
- printf("must enter here\n");
- liveHosts[i] = 1;
- tmpNumLiveHosts++;
- //locateObjHosts[i*2] = hostIpAddrs[i];
- }
- else {
- printf("or here\n");
- liveHosts[i] = 0;
- timeoutFlag = 0;
- }
- break;
-
+ liveHostsValid = 0;
+
+ //foreach in hostipaddrs, ping -> update list of livemachines
+ //socket connection?
+
+ int deadhost = -1;
+ //liveHosts lock here
+ int sd = 0, i, j, tmpNumLiveHosts = 0;
+ for(i = 0; i < numHostsInSystem; i++) {
+ if(i == myIndexInHostArray)
+ {
+ tmpNumLiveHosts++;
+ continue;
+ }
+ for(j = 0; j < 5; j++) { // hard define num of retries
+ if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
+#ifdef DEBUG
+ printf("%s -> Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
+#endif
+ usleep(1000);
+
+ if(j == 4) {
+ if(liveHosts[i]) {
+ liveHosts[i] = 0;
+ deadhost = i;
+ }
+ }
+ continue;
+ }
+
+ char liverequest[sizeof(char)];
+ liverequest[0] = RESPOND_LIVE;
+
+ send_data(sd, &liverequest[0], sizeof(liverequest));
+
+ char response = 0;
+ int timeout = recv_data(sd, &response, sizeof(response));
+
+ //try to send msg
+ //if timeout, dead host
+ if(response == LIVE) {
+ liveHosts[i] = 1;
+ tmpNumLiveHosts++;
}
- if(liveHosts[i] == 0)
- printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
+ else {
+ if(liveHosts[i]) {
+ liveHosts[i] = 0;
+ deadhost = i;
+ }
+ }
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
+ break;
}
- numLiveHostsInSystem = tmpNumLiveHosts;
- printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
- //have updated list of live machines
+#ifdef DEBUG
+ if(liveHosts[i] == 0)
+
+ printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
+#endif
+ }
+ numLiveHostsInSystem = tmpNumLiveHosts;
+#ifdef DEBUG
+ printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
+#endif
+ //have updated list of live machines
#ifdef DEBUG
printf("%s-> Exiting updateLiveHosts\n", __func__);
printHostsStatus();
#endif
+
+ return deadhost;
}
int getNumLiveHostsInSystem() {
}
int updateLiveHostsCommit() {
- int sd = 0, i;
+#ifdef DEBUG
+ printf("%s -> Enter\n",__func__);
+#endif
+ int sd = 0, i;
- char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
- updaterequest[0] = UPDATE_LIVE_HOSTS;
+ char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
+
+ updaterequest[0] = UPDATE_LIVE_HOSTS;
- for(i = 0; i < numHostsInSystem; i++) {
- *((int *)(&updaterequest[i*4+1])) = liveHosts[i]; // clean this up later
- }
-
- for(i = 0; i < numHostsInSystem*2; i++) {
- *((unsigned int *)(&updaterequest[i*4+(numHostsInSystem*4)+1])) = locateObjHosts[i]; //ditto
- }
-
- //for each machine send data
- for(i = 1; i < numHostsInSystem; i++) { // hard define num of retries
- if(i == myIndexInHostArray)
- continue;
- if(liveHosts[i] == 1) {
- if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
- printf("updateLiveHosts(): socket create error, attempt %d\n", i);
- return -1;
- }
- send_data(sd, updaterequest, sizeof(updaterequest));
- }
- }
- liveHostsValid = 1;
- printHostsStatus();
- return 0;
-}
+ for(i = 0; i < numHostsInSystem; i++) {
+ *((int *)(&updaterequest[i*4+1])) = liveHosts[i]; // clean this up later
+ }
-/*void updateLocateObjHosts(unsigned int failedmid) {
- int failedmidIndex = findHost(failedmid);
- int i = 0, validIndex = 0;
+ for(i = 0; i < numHostsInSystem*2; i++) {
+ *((unsigned int *)(&updaterequest[i*4+(numHostsInSystem*4)+1])) = locateObjHosts[i]; //ditto
+ }
- for(; i < numHostsInSystem; i++) {
- if(locateObjHosts[(i*2)] == failedmid) {
- while(liveHosts[(i+validIndex)%numHostsInSystem] == 0)
- validIndex++;
- locateObjHosts[(i*2)] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
- validIndex++;
- while(liveHosts[(i+validIndex)%numHostsInSystem] == 0)
- validIndex++;
- locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
- }
- else if(locateObjHosts[(i*2)+1] == failedmid) {
- while(liveHosts[(i+validIndex)%numHostsInSystem] == 0)
- validIndex++;
- locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
- validIndex = 0;
+ //for each machine send data
+
+ for(i = 0; i < numHostsInSystem; i++) { // hard define num of retries
+ if(i == myIndexInHostArray)
+ continue;
+ if(liveHosts[i] == 1) {
+ if((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
+ printf("%s -> socket create error, attempt %d\n",__func__, i);
+ return -1;
+ }
+ send_data(sd, updaterequest, sizeof(updaterequest));
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
}
-}*/
+ liveHostsValid = 1;
+#ifdef DEBUG
+ printHostsStatus();
+ printf("%s -> Finish\n",__func__);
+#endif
+
+ return 0;
+}
void setLocateObjHosts() {
int i = 0, validIndex = 0;
}
}
+void setReLocateObjHosts(int mid)
+{
+ int mIndex = findHost(mid);
+ int backupMachine = getBackupMachine(mid);
+ int newPrimary = getDuplicatedPrimaryMachine(mid);
+ int newPrimaryIndex = findHost(newPrimary);
+ int i;
+
+ locateObjHosts[2*newPrimaryIndex+1] = backupMachine;
+ locateObjHosts[2*mIndex] = newPrimary;
+
+/* relocate the objects of the machines already dead */
+ for(i=0; i<numHostsInSystem *2; i+=2) {
+ if(locateObjHosts[i] == mid)
+ locateObjHosts[i] = newPrimary;
+ if(locateObjHosts[i+1] == mid)
+ locateObjHosts[i+1] = backupMachine;
+ }
+}
+
+
//debug function
void printHostsStatus() {
int i;
-#ifdef DEBUG
printf("%s-> *printing live machines and backups*\n", __func__);
-#endif
for(i = 0; i < numHostsInSystem; i++) {
if(liveHosts[i]) {
-#ifdef DEBUG
printf("%s-> [%s]: LIVE\n", __func__, midtoIPString(hostIpAddrs[i]));
-#endif
}
else {
-#ifdef DEBUG
printf("%s-> [%s]: DEAD\n", __func__, midtoIPString(hostIpAddrs[i]));
-#endif
}
-#ifdef DEBUG
printf("%s-> original:\t[%s]\n", __func__, midtoIPString(locateObjHosts[i*2]));
printf("%s-> backup:\t[%s]\n", __func__, midtoIPString(locateObjHosts[i*2+1]));
-#endif
}
}
void duplicateLostObjects(unsigned int mid){
-#ifdef DEBUG
printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));
-#endif
//this needs to be changed.
- unsigned int backupMid = getBackupMachine(mid);
- unsigned int originalMid = getDuplicatedPrimaryMachine(mid);
+ unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
+ unsigned int originalMid = getDuplicatedPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
#ifdef DEBUG
printf("%s-> backupMid: [%s], ", __func__, midtoIPString(backupMid));
printf("originalMid: [%s]\n", midtoIPString(originalMid));
+ printHostsStatus();
#endif
- setLocateObjHosts();
- printHostsStatus();
+ setReLocateObjHosts(mid);
//connect to these machines
//go through their object store copying necessary (in a transaction)
//transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
int sd = 0, i, j, tmpNumLiveHosts = 0;
- if(originalMid == myIpAddr) {
- originalMid = getPrimaryMachine(mid);
- printf("originalMid: [%s]\n", midtoIPString(originalMid));
- duplicateLocalOriginalObjects(originalMid);
+ /* duplicateLostObject example
+ * Before M24 die,
+ * MID 21 24 26
+ * Primary 21 24 26
+ * Backup 26 21 24
+ * After M24 die,
+ * MID 21 26
+ * Primary 21,24 26
+ * Backup 26 21,24
+ */
+
+ if(originalMid == myIpAddr) { // copy local machine's backup data, make it as primary data of backup machine.
+ duplicateLocalOriginalObjects(backupMid);
}
- else if((sd = getSock2WithLock(transRequestSockPool, originalMid)) < 0) {
- printf("updateLiveHosts(): socket create error, attempt %d\n", j);
+ else if((sd = getSockWithLock(transPrefetchSockPool, originalMid)) < 0) {
+ printf("%s -> socket create error, attempt %d\n", __func__,j);
+ exit(0);
//usleep(1000);
}
- else {
+ else { // if original is not local
char duperequest;
duperequest = DUPLICATE_ORIGINAL;
send_data(sd, &duperequest, sizeof(char));
#ifdef DEBUG
- printf("%s-> Sent DUPLICATE_ORIGINAL request\n", __func__);
+ printf("%s-> SD : %d Sent DUPLICATE_ORIGINAL request to %s\n", __func__,sd,midtoIPString(originalMid));
#endif
- originalMid = getPrimaryMachine(mid);
- printf("originalMid: [%s]\n", midtoIPString(originalMid));
- send_data(sd, &originalMid, sizeof(unsigned int));
+ send_data(sd, &backupMid, sizeof(unsigned int));
+
+ char response;
+ recv_data(sd, &response, sizeof(char));
#ifdef DEBUG
- printf("%s-> Sent originalMid\n", __func__);
+ printf("%s (DUPLICATE_ORIGINAL) -> Received %s\n", __func__,(response==DUPLICATION_COMPLETE)?"DUPLICATION_COMPLETE":"DUPLICATION_FAIL");
#endif
- char response;
- recv_data_block(sd, &response, sizeof(char));
- printf("YES! Received %d\n", response);
- }
- if(backupMid == myIpAddr) {
- backupMid = getBackupMachine(mid);
- duplicateLocalBackupObjects(backupMid);
+ freeSockWithLock(transPrefetchSockPool, originalMid, sd);
+ }
+
+ if(backupMid == myIpAddr) { // copy local machine's primary data, and make it as backup data of original machine.
+ duplicateLocalBackupObjects(originalMid);
}
- else if((sd = getSock2WithLock(transRequestSockPool, backupMid)) < 0) {
+ else if((sd = getSockWithLock(transPrefetchSockPool, backupMid)) < 0) {
printf("updateLiveHosts(): socket create error, attempt %d\n", j);
exit(1);
}
duperequest = DUPLICATE_BACKUP;
send_data(sd, &duperequest, sizeof(char));
#ifdef DEBUG
- printf("%s-> Sent DUPLICATE_BACKUP request\n", __func__);
+ printf("%s-> SD : %d Sent DUPLICATE_BACKUP request to %s\n", __func__,sd,midtoIPString(backupMid));
#endif
- backupMid = getBackupMachine(mid);
- send_data(sd, &backupMid, sizeof(unsigned int));
+ send_data(sd, &originalMid, sizeof(unsigned int));
+
+ char response;
+ recv_data(sd, &response, sizeof(char));
#ifdef DEBUG
- printf("%s-> Sent backupMid\n", __func__);
+ printf("%s (DUPLICATE_BACKUP) -> Received %s\n", __func__,(response==DUPLICATION_COMPLETE)?"DUPLICATION_COMPLETE":"DUPLICATION_FAIL");
#endif
- char response;
- recv_data_block(sd, &response, sizeof(char));
- printf("YES! Received %d\n", response);
+ freeSockWithLock(transPrefetchSockPool, backupMid, sd);
}
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End\n", __func__);
#endif
}
void duplicateLocalBackupObjects(unsigned int mid) {
int tempsize, sd;
+ int i;
char *dupeptr, ctrl, response;
-
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> Start; backup mid:%s\n", __func__, midtoIPString(mid));
#endif
+
//copy code from dstmserver here
- tempsize = mhashGetDuplicate(&dupeptr, 1);
+ tempsize = mhashGetDuplicate((void**)&dupeptr, 1);
+#ifdef DEBUG
printf("tempsize:%d, dupeptrfirstvalue:%d\n", tempsize, *((unsigned int *)(dupeptr)));
+#endif
//send control and dupes after
ctrl = RECEIVE_DUPES;
- if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
+ if((sd = getSockWithLock(transPrefetchSockPool, mid)) < 0) {
printf("duplicatelocalbackup: socket create error\n");
//usleep(1000);
}
-
- printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
+#ifdef DEBUG
+ printf("%s -> sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", __func__,sd, tempsize, *((unsigned int *)(dupeptr)));
+#endif
send_data(sd, &ctrl, sizeof(char));
send_data(sd, dupeptr, tempsize);
- recv_data(sd, &response, sizeof(char));
+
+ recv_data(sd, &response, sizeof(char));
+ freeSockWithLock(transPrefetchSockPool,mid,sd);
+
+#ifdef DEBUG
+ printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
+#endif
+
if(response != DUPLICATION_COMPLETE) {
- //fail message
+#ifndef DEBUG
+ printf("%s -> DUPLICATION_FAIL\n",__func__);
+#endif
+ exit(-1);
}
+ free(dupeptr);
- freeSockWithLock(transRequestSockPool, mid, sd);
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End\n", __func__);
#endif
int tempsize, sd;
char *dupeptr, ctrl, response;
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> Start\n", __func__);
#endif
//copy code fom dstmserver here
- tempsize = mhashGetDuplicate(&dupeptr, 0);
+ tempsize = mhashGetDuplicate((void**)&dupeptr, 0);
//send control and dupes after
ctrl = RECEIVE_DUPES;
- if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
+ if((sd = getSockWithLock(transPrefetchSockPool, mid)) < 0) {
printf("DUPLICATE_ORIGINAL: socket create error\n");
//usleep(1000);
}
+#ifdef DEBUG
printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
+#endif
send_data(sd, &ctrl, sizeof(char));
send_data(sd, dupeptr, tempsize);
recv_data(sd, &response, sizeof(char));
+ freeSockWithLock(transPrefetchSockPool,mid,sd);
+
+#ifdef DEBUG
+ printf("%s ->response : %d - %d\n",__func__,response,DUPLICATION_COMPLETE);
+#endif
+
if(response != DUPLICATION_COMPLETE) {
//fail message
+#ifndef DEBUG
+ printf("%s -> DUPLICATION_FAIL\n",__func__);
+#endif
+ exit(0);
}
- freeSockWithLock(transRequestSockPool, mid, sd);
+
+ free(dupeptr);
-#ifdef DEBUG
+#ifndef DEBUG
printf("%s-> End\n", __func__);
#endif
}
+#endif
+
void addHost(unsigned int hostIp) {
unsigned int *tmpArray;
int *tmpliveHostsArray;
free(hostIpAddrs);
hostIpAddrs = tmpArray;
+#ifdef RECOVERY
tmpliveHostsArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
memcpy(tmpliveHostsArray, liveHosts, sizeof(unsigned int) * numHostsInSystem);
free(liveHosts);
memcpy(tmplocateObjHostsArray, locateObjHosts, sizeof(unsigned int) * numHostsInSystem);
free(locateObjHosts);
locateObjHosts = tmplocateObjHostsArray;
-
+#endif
sizeOfHostArray *= 2;
}
hostIpAddrs[numHostsInSystem] = hostIp;
+
+#ifdef RECOVERY
liveHosts[numHostsInSystem] = 0;
locateObjHosts[numHostsInSystem*2] = hostIp;
+#endif
numHostsInSystem++;
return;
/* This function sends notification request per thread waiting on object(s) whose version
* changes */
+#ifdef RECOVERY
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid, int waitmid) {
+#else
int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
- int sock,i;
+#endif
+ int psock,i;
objheader_t *objheader;
- struct sockaddr_in remoteAddr;
+ struct sockaddr_in premoteAddr;
char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
char *ptr;
- int bytesSent;
int status, size;
unsigned short version;
unsigned int oid,mid;
pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
notifydata_t *ndata;
+#ifdef RECOVERY
+ int bsock;
+ struct sockaddr_in bremoteAddr;
+#endif
+
oid = oidarry[0];
+
if((mid = lhashSearch(oid)) == 0) {
printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
return;
}
+#ifdef RECOVERY
+ int pmid = getPrimaryMachine(mid);
+ int bmid = getBackupMachine(mid);
+#else
+ int pmid = mid;
+#endif
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+#ifdef RECOVERY
+ if ((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
+ (bsock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
+#else
+ if ((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+#endif
perror("reqNotify():socket()");
return -1;
}
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
+ /* for primary machine */
+ bzero(&premoteAddr, sizeof(premoteAddr));
+ premoteAddr.sin_family = AF_INET;
+ premoteAddr.sin_port = htons(LISTEN_PORT);
+ premoteAddr.sin_addr.s_addr = htonl(pmid);
+#ifdef RECOVERY
+ /* for backup machine */
+ bzero(&bremoteAddr, sizeof(bremoteAddr));
+ bremoteAddr.sin_family = AF_INET;
+ bremoteAddr.sin_port = htons(LISTEN_PORT);
+ bremoteAddr.sin_addr.s_addr = htonl(bmid);
+#endif
/* Generate unique threadid */
threadid++;
-
+
/* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
}
/* Send number of oids, oidarry, version array, machine id and threadid */
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+#ifdef RECOVERY
+ if ((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr))< 0) ||
+ (connect(bsock, (struct sockaddr *)&bremoteAddr, sizeof(bremoteAddr))< 0)) {
+#else
+ if ((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr))< 0)) {
+#endif
printf("reqNotify():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ inet_ntoa(premoteAddr.sin_addr), LISTEN_PORT);
free(ndata);
return -1;
} else {
+
+#ifdef DEBUG
+ printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
+#ifdef RECOVERY
+ printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
+#endif
+#endif
+
msg[0] = THREAD_NOTIFY_REQUEST;
+
*((unsigned int *)(&msg[1])) = numoid;
/* Send array of oids */
size = sizeof(unsigned int);
for(i = 0;i < numoid; i++) {
oid = oidarry[i];
+#ifdef DEBUG
+ printf("%s -> oid[%d] = %d\n",__func__,i,oidarry[i]);
+#endif
*((unsigned int *)(&msg[1] + size)) = oid;
size += sizeof(unsigned int);
}
size += sizeof(unsigned short);
}
- *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+ size += sizeof(unsigned int);
*((unsigned int *)(&msg[1] + size)) = threadid;
- pthread_mutex_lock(&(ndata->threadnotify));
+#ifdef RECOVERY
+ waitThreadMid = waitmid;
+ waitThreadID = threadid;
+#ifdef DEBUG
+ printf("%s -> This Thread is waiting for %s\n",__func__,midtoIPString(waitmid));
+#endif
+#endif
+
size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
- send_data(sock, msg, size);
+ pthread_mutex_lock(&(ndata->threadnotify));
+ send_data(psock, msg, size);
+#ifdef RECOVERY
+ send_data(bsock, msg, size);
+#endif
pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
pthread_mutex_unlock(&(ndata->threadnotify));
}
pthread_cond_destroy(&threadcond);
pthread_mutex_destroy(&threadnotify);
free(ndata);
- close(sock);
+ close(psock);
+
+#ifdef RECOVERY
+ close(bsock);
+#endif
+
return status;
}
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
notifydata_t *ndata;
- int i, objIsFound = 0, index;
+ int i, objIsFound = 0, index = -1;
void *ptr;
+#ifdef DEBUG
+ printf("%s -> oid = %d vesion = %d tid = %d\n",__func__,oid,version,tid);
+#endif
//Look up the tid and call the corresponding pthread_cond_signal
if((ndata = notifyhashSearch(tid)) == NULL) {
} else {
for(i = 0; i < ndata->numoid; i++) {
if(ndata->oidarry[i] == oid) {
- objIsFound = 1;
- index = i;
+ objIsFound = 1;
+ index = i;
+
}
}
if(objIsFound == 0) {
printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
return;
- } else {
- if(version <= ndata->versionarry[index]) {
- printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
- return;
+ }
+ else {
+ if(version <= ndata->versionarry[index] && version >= 0) {
+ printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
+ return;
} else {
#ifdef CACHE
/* Clear from prefetch cache and free thread related data structure */
- if((ptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- }
+ if((ptr = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
#endif
- pthread_mutex_lock(&(ndata->threadnotify));
- pthread_cond_signal(&(ndata->threadcond));
- pthread_mutex_unlock(&(ndata->threadnotify));
+ pthread_mutex_lock(&(ndata->threadnotify));
+ pthread_cond_signal(&(ndata->threadcond));
+ pthread_mutex_unlock(&(ndata->threadnotify));
}
}
}
+
+#ifdef DEBUG
+ printf("%s -> Finished\n",__func__);
+#endif
return;
}
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
int sock, status, size, bytesSent;
+#ifdef DEBUG
+ printf("%s -> Entering \n",__func__);
+#endif
while(*head != NULL) {
ptr = *head;
+
mid = ptr->mid;
+#ifdef DEBUG
+ printf("%s -> trying to connect MID : %s\n",__func__,midtoIPString(mid));
+#endif
+
//create a socket connection to that machine
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("notifyAll():socket()");
fflush(stdout);
status = -1;
} else {
+#ifdef DEBUG
+ printf("%s -> connected\n",__func__);
+#endif
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;
*((unsigned int *)&msg[1]) = oid;
}
//close socket
close(sock);
+
// Update head
*head = ptr->next;
free(ptr);
+#ifdef DEBUG
+ printf("%s -> End notifying MID : %s\n",__func__,midtoIPString(mid));
+#endif
}
return status;
}
+
void transAbort() {
#ifdef ABORTREADERS
removetransactionhash();
tmp = pile;
//Add oid into a machine that is already present in the pile linked list structure
while(tmp != NULL) {
+// printf("tmp->mid = [%s], mid = [%s]\n", midtoIPString(tmp->mid), midtoIPString(mid));
if (tmp->mid == mid) {
int tmpsize;
tmp->numcreated++;
GETSIZE(tmpsize, headeraddr);
tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
- /*if(numHostsInSystem > 1) {
- STATUS(headeraddr) = DIRTY;
- //printf("Redo pInsert for oid %d, now modified\n", OID(headeraddr));
- //printf("this machine: %d\n", mid);
- midtoIP(tmp->mid, ip);
- pile = pInsert(tmp, headeraddr, locateBackupMachine(headeraddr), num_objs);
-
- // printf("header version: %d\n", headeraddr->version);
- //printf("Finished Redo pInsert for oid %d, now modified\n", OID(headeraddr));
- }*/
} else if (STATUS(headeraddr) & DIRTY) {
tmp->oidmod[tmp->nummod] = OID(headeraddr);
tmp->nummod++;
ptr->numcreated++;
GETSIZE(tmpsize, headeraddr);
ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
- /*if(numHostsInSystem > 1) {
- STATUS(headeraddr) = DIRTY;
- midtoIP(ptr->mid, ip);
-
- printf("np; ptr->mid: %s, oid: %d, header version: %d\n", ip, OID(headeraddr), headeraddr->version);
- //printf("header version: %d\n", headeraddr->version);
- pile = pInsert(tmp, headeraddr, locateBackupMachine(headeraddr), num_objs);
- //printf("header version: %d\n", headeraddr->version);
- }*/
} else if (STATUS(headeraddr) & DIRTY) {
ptr->oidmod[ptr->nummod] = OID(headeraddr);
ptr->nummod++;
GETSIZE(tmpsize, headeraddr);
ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
- //printf("Redo oid %d?\n", OID(headeraddr));
- /* midtoIP(ptr->mid, ip);
- printf("np; Redo? ptr->mid: %s, oid: %d, header version: %d\n", ip, OID(headeraddr), headeraddr->version);*/
} else {
*((unsigned int *)ptr->objread)=OID(headeraddr);
offset = sizeof(unsigned int);
return pileptr;
}
+#ifdef RECOVERY
/* Paxo Algorithm:
* Executes when the known leader has failed.
* Guarantees consensus on next leader among all live hosts. */
} while (ret == -1);
#ifdef DEBUG
- printf("\n>> Debug : Leader : [%s]\n", midtoIPString(leader));
+ printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader);
#endif
return ret;
if(!liveHosts[i])
continue;
- if ((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+ if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
printf("paxosPrepare(): socket create error\n");
continue;
}
#endif
send_data(sd, &control, sizeof(char));
send_data(sd, &my_n, sizeof(int));
- recv_data(sd, &control, sizeof(char));
- if ((sd == -1) || (timeoutFlag == 1)) {
+ int timeout = recv_data(sd, &control, sizeof(char));
+ if ((sd == -1) || (timeout < 0)) {
#ifdef DEBUG
printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
#endif
- timeoutFlag = 0;
continue;
}
case PAXOS_PREPARE_REJECT:
break;
}
+
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
#ifdef DEBUG
#ifdef DEBUG
printf("[Accept]...\n");
#endif
-
for (i = 0; i < numHostsInSystem; ++i) {
control = PAXOS_ACCEPT;
- if(!liveHosts[i])
+
+ if(!liveHosts[i])
continue;
- if ((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+ if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
printf("paxosAccept(): socket create error\n");
continue;
}
send_data(sd, &my_n, sizeof(int));
send_data(sd, &remote_v, sizeof(int));
- recv_data(sd, &control, sizeof(char));
- if ((sd == -1) || (timeoutFlag == 1)) {
+ int timeout = recv_data(sd, &control, sizeof(char));
+ if ((sd == -1) || (timeout < 0)) {
#ifdef DEBUG
printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
#endif
- timeoutFlag = 0;
continue;
}
#ifdef DEBUG
printf(">> Debug : Accept - n_h [%d], n_a [%d], v_a [%s]\n", n_h, n_a, midtoIPString(v_a));
#endif
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
}
if (cnt >= (numLiveHostsInSystem / 2)) {
#endif
control = PAXOS_LEARN;
- // transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
for (i = 0; i < numHostsInSystem; ++i) {
if(!liveHosts[i])
#endif
continue;
}
- if ((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+ if ((sd = getSockWithLock(transPrefetchSockPool, hostIpAddrs[i])) < 0) {
continue;
// printf("paxosLearn(): socket create error, attemp\n");
}
send_data(sd, &control, sizeof(char));
send_data(sd, &v_a, sizeof(int));
+
+ freeSockWithLock(transPrefetchSockPool,hostIpAddrs[i],sd);
+
}
//return v_a;
}
+
+
+void clearDeadThreadsNotification()
+{
+
+#ifdef DEBUG
+ printf("%s -> Entered\n",__func__);
+#endif
+// clear all the threadnotify request first
+
+ if(waitThreadID != -1) {
+#ifdef DEBUG
+ printf("%s -> I was waitng for %s\n",__func__,midtoIPString(waitThreadMid));
+#endif
+ int waitThreadIndex = findHost(waitThreadMid);
+ int i;
+ notifydata_t *ndata;
+
+ if(liveHosts[waitThreadIndex] == 0) // the thread waiting for is dead
+ {
+ if((ndata = (notifydata_t*)notifyhashSearch(waitThreadID)) == NULL) {
+ return;
+ }
+
+ for(i =0 ; i < ndata->numoid; i++) {
+ clearNotifyList(ndata->oidarry[i]); // clear thread object's notifylist
+ }
+
+ pthread_mutex_lock(&(ndata->threadnotify));
+ pthread_cond_signal(&(ndata->threadcond));
+ pthread_mutex_unlock(&(ndata->threadnotify));
+
+ waitThreadMid = -1;
+ waitThreadID = -1;
+ }
+ }
+
+#ifdef DEBUG
+ printf("%s -> Finished\n",__func__);
+#endif
+}
+
+/* request the primary and the backup machines to clear
+ thread obj's notify list */
+void reqClearNotifyList(unsigned int oid)
+{
+ int psock,bsock,i;
+ int mid,pmid,bmid;
+ objheader_t *objheader;
+ struct sockaddr_in premoteAddr, bremoteAddr;
+ char msg[1 + sizeof(unsigned int)];
+
+ if((mid = lhashSearch(oid)) == 0) {
+ printf("%s -> No such machine found for oid %x\n",__func__,oid);
+ return;
+ }
+
+ pmid = getPrimaryMachine(mid);
+ bmid = getBackupMachine(mid);
+
+ if((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
+ (bsock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("clearNotifyList() : socket()");
+ return ;
+ }
+
+ /* for primary machine */
+ bzero(&premoteAddr, sizeof(premoteAddr));
+ premoteAddr.sin_family = AF_INET;
+ premoteAddr.sin_port = htons(LISTEN_PORT);
+ premoteAddr.sin_addr.s_addr = htonl(pmid);
+
+ /* for backup machine */
+ bzero(&bremoteAddr, sizeof(bremoteAddr));
+ bremoteAddr.sin_family = AF_INET;
+ bremoteAddr.sin_port = htons(LISTEN_PORT);
+ bremoteAddr.sin_addr.s_addr = htonl(bmid);
+
+ /* send message to both the primary and the backup */
+ if((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr)) < 0) ||
+ (connect(bsock, (struct sockaddr *)&bremoteAddr, sizeof(bremoteAddr)) < 0)) {
+ printf("%s -> error in connecting\n",__func__);
+ return;
+ }
+ else {
+ printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
+ printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
+
+ msg[0] = CLEAR_NOTIFY_LIST;
+ *((unsigned int *)(&msg[1])) = oid;
+
+ send_data(psock, &msg, sizeof(char) + sizeof(unsigned int));
+ send_data(bsock, &msg, sizeof(char) + sizeof(unsigned int));
+ }
+
+ close(psock);
+ close(bsock);
+
+}
+
+
+int checkiftheMachineDead(unsigned int mid) {
+ int mIndex = findHost(mid);
+ return getStatus(mIndex);
+}
+
+#endif