From: adash Date: Wed, 20 Jun 2007 13:02:45 +0000 (+0000) Subject: Fixed bugs X-Git-Tag: preEdgeChange~546 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=607e14cd1894d596daaaa19248edd422c76e3cb0;p=IRC.git Fixed bugs a)changed code to retry commit after random delay (soft abort case) b)fixed sending n copies of TRANS_ABORT to all threads( was only sent once before) c)separate multithreaded from single thread functionality(was mixed before) d)tested for all testcases Working version for TRANS_COMMIT and TRANS_ABORT --- diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 2f841f28..f01dcef9 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,22 +1,22 @@ -d-2: - gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c +d-3: + gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c demsky: - gcc -DDEBUG -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c -d-1: - gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c +d-4: + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c all: - gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c - gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c - gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c mac: - gcc -DMAC -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c - gcc -DMAC -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c - gcc -DMAC -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c clean: - rm -rf d-2 d-1 demsky + rm -rf d-3 d-4 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index de0af029..d46d47fb 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -13,7 +13,6 @@ #define TRANS_REQUEST 5 #define TRANS_ABORT 6 #define TRANS_COMMIT 7 -#define TRANS_ABORT_BUT_RETRY_COMMIT 8 #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9 //Participant Messages @@ -31,9 +30,6 @@ #define OBJ_LOCKED_BUT_VERSION_MATCH 14 #define OBJ_UNLOCK_BUT_VERSION_MATCH 15 #define VERSION_NO_MATCH 16 -//TODO REMOVE THIS -#define NO_MISSING_OIDS 22 -#define MISSING_OIDS_PRESENT 23 #include @@ -67,13 +63,6 @@ typedef struct transrecord { objstr_t *cache; chashtable_t *lookupTable; } transrecord_t; -/* -typedef struct pile { - unsigned int mid; - unsigned int oid; - struct pile *next; -}pile_t; -*/ // Structure that keeps track of responses from the participants typedef struct thread_response { char rcv_status; @@ -108,9 +97,14 @@ typedef struct thread_data_array { pthread_cond_t *threshold; //threshhold for waking up a thread pthread_mutex_t *lock; //lock the count variable int *count; //variable to count responses of TRANS_REQUEST protocol from all participants + char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp + char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case) transrecord_t *rec; // To send modified objects }thread_data_array_t; + + + // Structure to save information about an oid necesaary for the decideControl() typedef struct objinfo { unsigned int oid; @@ -152,7 +146,8 @@ char handleTransReq(int, fixed_data_t *, trans_commit_data_t *, unsigned int *, transrecord_t *transStart(); objheader_t *transRead(transrecord_t *record, unsigned int oid); objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid -int decideResponse(thread_data_array_t *tdata, int sd, int status);// Coordinator decides what response to send to the participant +int decideResponse(thread_data_array_t *tdata);// Coordinator decides what response to send to the participant +char sendResponse(thread_data_array_t *tdata, int sd); //Sends control message back to Participants void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins int transCommit(transrecord_t *record); //return 0 if successful void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 34487ff0..9590a1c9 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -97,6 +97,9 @@ void *dstmAccept(void *acceptfd) printf("Recieved connection: fd = %d\n", (int)acceptfd); if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { + if (retval == 0) { + return; // Testing connection + } perror("Error in receiving control from coordinator\n"); return; } @@ -106,7 +109,6 @@ void *dstmAccept(void *acceptfd) perror("Error receiving object from cooridnator\n"); return; } - printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid); srcObj = mhashSearch(oid); h = (objheader_t *) srcObj; size = sizeof(objheader_t) + sizeof(classsize[h->type]); @@ -141,7 +143,7 @@ void *dstmAccept(void *acceptfd) break; case TRANS_REQUEST: - printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n"); + printf("DEBUG -> Recv TRANS_REQUEST\n"); if((val = readClientReq((int)acceptfd, &transinfo)) != 0) { printf("Error in readClientReq\n"); } @@ -156,9 +158,9 @@ void *dstmAccept(void *acceptfd) printf("Closed connection: fd = %d\n", (int)acceptfd); pthread_exit(NULL); - printf("DEBUG -> Exiting dstmAccept\n"); } +// Reads transaction request per thread int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { char *ptr, control, prevctrl, sendctrl, newctrl; void *modptr, *header; @@ -191,7 +193,8 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { int numread = fixed.numread; N = numread * (sizeof(unsigned int) + sizeof(short)); char objread[N]; - if(numread != 0) { // If pile contains objects to be read + if(numread != 0) { //If pile contains more than one object to be read, + // keep reading all objects sum = 0; do { n = recv((int)acceptfd, (void *) objread, N, 0); @@ -200,9 +203,10 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } // Read modified objects - if(fixed.nummod != 0) { // If pile contains modified objects + if(fixed.nummod != 0) { // If pile contains more than one modified object, + // allocate new object store and recv all modified objects if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { - printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__); + printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); return 1; } sum = 0; @@ -212,26 +216,20 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } while (sum < fixed.sum_bytes && n != 0); } + // Process the information available in the TRANS_REQUEST control //Send control message as per all votes from all oids in the machine if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { - printf("Handle req error\n"); + printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__); + return 1; } - //Read for new control message from Coordiator if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { - perror("Error in receiving control message"); + perror("Error in receiving control message\n"); return 1; } switch(control) { case TRANS_ABORT: - printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n"); - //send ack to coordinator - sendctrl = TRANS_SUCESSFUL; - if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ACK to coordinator\n"); - return 1; - } //Mark all ref counts as 1 and do garbage collection ptr = modptr; for(i = 0; i< fixed.nummod; i++) { @@ -241,66 +239,31 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } //Unlock objects that was locked in this machine due to this transaction for(i = 0; i< transinfo->numlocked; i++) { + printf("DEBUG-> Unlocking objects\n"); header = mhashSearch(transinfo->objlocked[i]);// find the header address ((objheader_t *)header)->status &= ~(LOCK); } + + //send ack to coordinator + printf("DEBUG -> Recv TRANS_ABORT\n"); + sendctrl = TRANS_SUCESSFUL; + if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { + perror("Error sending ACK to coordinator\n"); + return 1; + } + ptr = NULL; return 0; case TRANS_COMMIT: - printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd); + printf("DEBUG -> Recv TRANS_COMMIT \n"); if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); } break; - case TRANS_ABORT_BUT_RETRY_COMMIT: - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n"); - //Process again after waiting for sometime and on prev control message sent - sleep(2); - switch(prevctrl) { - case TRANS_AGREE: - sendctrl = TRANS_AGREE; - if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ACK to coordinator\n"); - } - break; - case TRANS_SOFT_ABORT: - if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { - printf("Handle req error\n"); - } - //If no change in previous control message that was sent then ABORT transaction - if(newctrl == TRANS_SOFT_ABORT){ - //Send ABORT - newctrl = TRANS_DISAGREE; - if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ACK to coordinator\n"); - } - //Set the reference count of the object to 1 in mainstore for garbage collection - ptr = modptr; - for(i = 0; i< fixed.nummod; i++) { - tmp_header = (objheader_t *) ptr; - tmp_header->rcount = 1; - ptr += sizeof(objheader_t) + classsize[tmp_header->type]; - } - //Unlock objects that was locked in this machine due to this transaction - for(i = 0; i< transinfo->numlocked; i++) { - ptr = mhashSearch(transinfo->objlocked[i]);// find the header address - ((objheader_t *)ptr)->status &= ~(LOCK); - } - } else if(newctrl == TRANS_AGREE) { - newctrl = TRANS_AGREE; - //Send new control message - if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ACK to coordinator\n"); - } - } - - break; - } - break; case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING: //TODO expect another transrequest from client - printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n"); + printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n"); break; default: printf("No response to TRANS_AGREE OR DISAGREE protocol\n"); @@ -308,7 +271,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { break; } //Free memory - printf("DEBUG -> Freeing..."); + printf("DEBUG -> Freeing...\n"); fflush(stdout); if (transinfo->objmod != NULL) { free(transinfo->objmod); @@ -325,8 +288,8 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { return 0; } -//This function runs a decision after all objects are weighed under one of the 4 possibilities -//and returns the appropriate control message to the Ccordinator +//This function runs a decision after all objects involved in TRANS_REQUEST +//and returns the appropriate control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT to the Ccordinator char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) { int val; short version; @@ -339,6 +302,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int)); // Counters and arrays to formulate decision on control message to be sent + // version match or no match int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; int objmodnotfound = 0, nummodfound = 0; void *mobj; @@ -346,9 +310,9 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran //Process each object present in the pile ptr = modptr; - //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread); - fflush(stdout); - //Process each oid in the machine pile/ group + + //Process each oid in the machine pile/ group per thread + //Should be a new function for (i = 0; i < fixed->numread + fixed->nummod; i++) { if (i < fixed->numread) {//Object is read int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array @@ -373,6 +337,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran //Check if obj is locked if ((((objheader_t *)mobj)->status & LOCK) == LOCK) { if (version == ((objheader_t *)mobj)->version) { // If version match + printf("DEBUG -> obj = %d locked\n", ((objheader_t *)mobj)->oid); v_matchlock++; } else {//If versions don't match ..HARD ABORT v_nomatch++; @@ -382,16 +347,16 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE\n"); + printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd); return control; } } else {//Obj is not locked , so lock object ((objheader_t *)mobj)->status |= LOCK; - //FOR TESTING + // TESTING Add sleep to make transactions run for a long time such that + // we can test for soft abort case sleep(1); //Save all object oids that are locked on this machine during this transaction request call oidlocked[objlocked] = ((objheader_t *)mobj)->oid; - printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid); objlocked++; if (version == ((objheader_t *)mobj)->version) { //If versions match v_matchnolock++; @@ -409,16 +374,9 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran } } } - - printf("No of objs locked = %d\n", objlocked); - printf("No of v_nomatch = %d\n", v_nomatch); - printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock); - printf("No of objs v_match but had locks before = %d\n", v_matchlock); - printf("No of objs not found = %d\n", objnotfound); - printf("No of objs modified but not found = %d\n", objmodnotfound); - + //Decide what control message(s) to send - //Cond to send TRANS_AGREE + // Should be a new function if(v_matchnolock == fixed->numread + fixed->nummod) { //send TRANS_AGREE to Coordinator control = TRANS_AGREE; @@ -430,19 +388,19 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran } //Condition to send TRANS_SOFT_ABORT if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { - //send TRANS_SOFT_ABORT to Coordinator control = TRANS_SOFT_ABORT; char msg[]={TRANS_SOFT_ABORT, 0,0,0,0}; - *((int*)&msg[1])=objnotfound; + *((int*)&msg[1])= objnotfound; printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); - //send number of oids not found and the missing oids + //Sending control message if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) { perror("Error in sending no of objects that are not found\n"); return 0; } + //send number of oids not found and the missing oids if objects are missing in the machine if(objnotfound != 0) { - int size=sizeof(unsigned int)*objnotfound; + int size = sizeof(unsigned int)*objnotfound; if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) { perror("Error in sending objects that are not found\n"); return 0; @@ -478,7 +436,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran return control; } -//Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back +//Process oids in the TRANS_COMMIT requested by the participant and sends an ACK back to Coordinator int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { objheader_t *header; int i = 0, offset = 0; @@ -486,7 +444,7 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { //Process each modified object saved in the mainobject store for(i=0; inummod; i++) { if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { - printf("mhashserach returns NULL\n"); + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); } //change reference count of older address and free space in objstr ?? header->rcount = 1; //Not sure what would be th val @@ -509,8 +467,7 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { //send ack to coordinator control = TRANS_SUCESSFUL; - //FOR TESTING - printf("DEBUG-> Transaction is SUCCESSFUL \n"); + printf("DEBUG-> TRANS_SUCESSFUL\n"); if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); } diff --git a/Robust/src/Runtime/DSTM/interface/ip.c b/Robust/src/Runtime/DSTM/interface/ip.c index e15e89bb..d959f49c 100644 --- a/Robust/src/Runtime/DSTM/interface/ip.c +++ b/Robust/src/Runtime/DSTM/interface/ip.c @@ -51,7 +51,6 @@ int checkServer(int mid, char *machineip) { while (connect(tmpsd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { sleep(1); } - printf("DEBUG -> Connection established with %s\n", machineip); close(tmpsd); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index c8cd19c3..61e397db 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -45,7 +45,7 @@ void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \ int main() { //test1(); - //test3(); +// test3(); test4(); } @@ -184,24 +184,23 @@ int test3() { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu + mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu lhashInsert(20, mid); lhashInsert(21, mid); lhashInsert(22, mid); - mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu //Inserting into lhashtable lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); - // pthread_create(&thread_Listen, NULL, dstmListen, NULL); - //Check if machine d-1 is up and running - checkServer(mid, "128.200.9.26"); - mid = iptoMid("128.200.9.27"); - //Check if machine d-2 is up and running - checkServer(mid, "128.200.9.27"); + //Check if machine d-4 is up and running + checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.200.9.29"); + //Check if machine d-3 is up and running + checkServer(mid, "128.200.9.29"); // Start Transaction myTrans = transStart(); @@ -277,46 +276,45 @@ int test4() { lhashInsert(header->oid, mid); //Inserting into lhashtable - mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu + mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu lhashInsert(20, mid); lhashInsert(21, mid); lhashInsert(22, mid); - mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu //Inserting into lhashtable lhashInsert(31, mid); lhashInsert(32, mid); lhashInsert(33, mid); - pthread_create(&thread_Listen, &attr, dstmListen, NULL); - // pthread_create(&thread_Listen, NULL, dstmListen, NULL); - //Check if machine d-1 is up and running - checkServer(mid, "128.200.9.26"); - mid = iptoMid("128.200.9.27"); - //Check if machine d-2 is up and running - checkServer(mid, "128.200.9.27"); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + //Check if machine d-4 is up and running + checkServer(mid, "128.200.9.30"); + mid = iptoMid("128.200.9.29"); + //Check if machine d-3 is up and running + checkServer(mid, "128.200.9.29"); // Start Transaction myTrans = transStart(); //read object 1(present in local machine) - if((h1 = transRead(myTrans, 1)) == NULL){ + if((h1 = transRead(myTrans, 2)) == NULL){ printf("Object not found\n"); } + //read object 2present in local machine) - if((h2 = transRead(myTrans, 2)) == NULL) { + if((h2 = transRead(myTrans, 1)) == NULL) { printf("Object not found\n"); } - //read object 31(present in d-1 machine) + //read object 31(present in d-4 machine) if((h3 = transRead(myTrans, 31)) == NULL) { printf("Object not found\n"); } - - //read object 21(present in d-2 machine) + //read object 21(present in d-3 machine) if((h4 = transRead(myTrans, 21)) == NULL) { printf("Object not found\n"); } - + // Commit transaction transCommit(myTrans); @@ -350,7 +348,6 @@ int test5() { lhashInsert(32, mid); lhashInsert(33, mid); pthread_create(&thread_Listen, &attr, dstmListen, NULL); -// pthread_create(&thread_Listen, NULL, dstmListen, NULL); printf("DEBUG -> mid = %d\n", mid); checkServer(mid, "128.200.9.26"); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 0af91b9d..e9daecf7 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -9,6 +9,9 @@ #include #include #include +#include +#include +#include #define LISTEN_PORT 2156 #define MACHINE_IP "127.0.0.1" @@ -16,6 +19,18 @@ extern int classsize[]; +void randomdelay(void) +{ + struct timespec req, rem; + time_t t; + + t = time(NULL); + req.tv_sec = 0; + req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec + nanosleep(&req, &rem); + return; +} + transrecord_t *transStart() { transrecord_t *tmp = malloc(sizeof(transrecord_t)); @@ -26,7 +41,6 @@ transrecord_t *transStart() objheader_t *transRead(transrecord_t *record, unsigned int oid) { - printf("Enter TRANS_READ\n"); unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; @@ -34,7 +48,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) void *buf; //check cache if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ - printf("DEBUG -> transRead oid %d found local\n", oid); + //printf("DEBUG -> transRead oid %d found local\n", oid); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { //Look up in Machine lookup table and found @@ -55,11 +69,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { //If object is not found in Remote location - printf("Object oid = %d not found in Machine %d\n", oid, machinenumber); + //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber); return NULL; } else { - printf("Object oid = %d found in Machine %d\n", oid, machinenumber); + //printf("Object oid = %d found in Machine %d\n", oid, machinenumber); return(objcopy); } } @@ -77,127 +91,105 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) chashInsert(record->lookupTable, tmp->oid, tmp); return tmp; } -//int decideResponse(thread_data_array_t *tdata, char *control, int sd) { -int decideResponse(thread_data_array_t *tdata, int sd, int val) { - int i, n, N, sum, retval, oidcount = 0; - int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0; - char ctrl, control, *ptr; - unsigned int *oidnotfound; - objheader_t *header; - printf("DEBUG -> pilecount is %d\n", tdata->pilecount); +//This function decides the reponse that needs to be sent to all other machines involved in a +//transaction by the machine that initiated the transaction request + +int decideResponse(thread_data_array_t *tdata) { + char control; + int i, transagree = 0, transdisagree = 0, transsoftabort = 0; + //Check common data structure for (i = 0 ; i < tdata->pilecount ; i++) { //Switch case control = tdata->recvmsg[i].rcv_status; switch(control) { case TRANS_DISAGREE: - printf("DEBUG-> Inside TRANS_DISAGREE\n"); + printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n"); transdisagree++; - //Free transaction records - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - //send Abort - ctrl = TRANS_ABORT; - for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved - if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - return 1; - } - } - return 0; + break; case TRANS_AGREE: - printf("DEBUG-> Inside TRANS_AGREE\n"); - PRINT_TID(tdata); + printf("DEBUG-> trans.c Recv TRANS_AGREE\n"); transagree++; break; case TRANS_SOFT_ABORT: - printf("DEBUG-> Inside TRANS_SOFT_ABORT\n"); + printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n"); transsoftabort++; - /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */ - if ((i == tdata->thread_id) && (val == 0)) { - //Read list of objects missing - if(read(sd, &oidcount, sizeof(int)) != 0) { - //Break if only objs are locked at the Participant side - if (oidcount == 0) { - break; - } - transsoftabortmiss++; - N = oidcount * sizeof(unsigned int); - if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - } - ptr = (char *) oidnotfound; - do { - n = read(sd, ptr+sum, N-sum); - sum += n; - } while(sum < N && n !=0); - } - } - break; default: - printf("Participant sent unknown message\n"); + printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + return -1; } } //Decide what control message to send to Participant - if(transagree == tdata->pilecount){ + if(transdisagree > 0) { + //Send Abort + *(tdata->replyctrl) = TRANS_ABORT; + printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); + objstrDelete(tdata->rec->cache); + chashDelete(tdata->rec->lookupTable); + free(tdata->rec); + } else if(transagree == tdata->pilecount){ //Send Commit - ctrl = TRANS_COMMIT; - printf("Sending TRANS_COMMIT\n"); - if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - return 1; - } - //printf("Sending control %d ,sd = %d\n", ctrl, i, sd); + *(tdata->replyctrl) = TRANS_COMMIT; + printf("DEBUG-> trans.c Sending TRANS_COMMIT\n"); + objstrDelete(tdata->rec->cache); + chashDelete(tdata->rec->lookupTable); + free(tdata->rec); + } else if(transsoftabort > 0 && transdisagree == 0) { + //Send Abort + *(tdata->replyctrl) = TRANS_ABORT; + *(tdata->replyretry) = 1; + //objstrDelete(tdata->rec->cache); + //chashDelete(tdata->rec->lookupTable); + //free(tdata->rec); + printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); + } else { + printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__); + return -1; } + + return 0; +} +//This function sends the final response to all threads in their respective socket id +char sendResponse(thread_data_array_t *tdata, int sd) { + int n, N, sum, oidcount = 0; + char *ptr, retval = 0; + unsigned int *oidnotfound; - if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) { - //Send abort but retry commit - ctrl = TRANS_ABORT_BUT_RETRY_COMMIT; - printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n"); - if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - return 1; - } - /* - //Sleep and the resend the request - sleep(2); - //Read new control message from Participant - - if((n = read(sd, &control, sizeof(char))) <= 0) { - perror("No bytes are read for participant\n"); - return 1; + //If the decided response is due to a soft abort and missing objects at the Participant's side + if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { + //Read list of objects missing + if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { + //Break if only objs are locked at the Participant side + N = oidcount * sizeof(unsigned int); + if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + } + ptr = (char *) oidnotfound; + do { + n = read(sd, ptr+sum, N-sum); + sum += n; + } while(sum < N && n !=0); } - - //Update common data structure and increment count - tdata->recvmsg[tdata->thread_id].rcv_status = control; - val = 1; - decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1 - */ + retval = TRANS_SOFT_ABORT; } - - if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) { - //Send abort but retry commit after relooking up objects - ctrl = TRANS_ABORT; - printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n"); - if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - return 1; - } - //TODO - //Relook up objects - //update location table - - //Free pointers - free(oidnotfound); + //If the decided response is TRANS_ABORT + if(*(tdata->replyctrl) == TRANS_ABORT) { + retval = TRANS_ABORT; } - - return 0; + if(*(tdata->replyctrl) == TRANS_COMMIT) { + retval = TRANS_COMMIT; + } + // Send response to the Participant + if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { + perror("Error sending ctrl message for participant\n"); + } + + return retval; } void *transRequest(void *threadarg) { @@ -206,54 +198,46 @@ void *transRequest(void *threadarg) { struct hostent *server; thread_data_array_t *tdata; objheader_t *headeraddr; - //unsigned int *oidnotfound; char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; - char machineip[16]; + char machineip[16], retval; tdata = (thread_data_array_t *) threadarg; //Send Trans Request if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket for TRANS_REQUEST"); + perror("Error in socket for TRANS_REQUEST\n"); return NULL; } bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); - //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); midtoIP(tdata->mid,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for TRANS_REQUEST"); + perror("Error in connect for TRANS_REQUEST\n"); return NULL; } - - //Multiple writes for sending packets of data - //Send first few fixed bytes of the TRANS_REQUEST protocol - printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control); -// printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t)); -// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes); + + printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); + //Send bytes of data with TRANS_REQUEST control message if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { - perror("Error sending fixed bytes for thread"); + perror("Error sending fixed bytes for thread\n"); return NULL; } //Send list of machines involved in the transaction -// printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount); { int size=sizeof(unsigned int)*tdata->pilecount; if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { - perror("Error sending list of machines for thread"); + perror("Error sending list of machines for thread\n"); return NULL; } } //Send oids and version number tuples for objects that are read -// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread); -// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); { int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { - perror("Error sending tuples for thread"); + perror("Error sending tuples for thread\n"); return NULL; } } @@ -263,11 +247,11 @@ void *transRequest(void *threadarg) { headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); size=sizeof(objheader_t)+classsize[headeraddr->type]; if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { - perror("Error sending obj modified for thread"); + perror("Error sending obj modified for thread\n"); return NULL; } } - + //Read message control message from participant side if((n = read(sd, &control, sizeof(char))) <= 0) { perror("Error in reading control message from Participant\n"); @@ -280,22 +264,29 @@ void *transRequest(void *threadarg) { //Lock and update count //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(tdata->lock); - (*(tdata->count))++; + + (*(tdata->count))++; if(*(tdata->count) == tdata->pilecount) { + if (decideResponse(tdata) != 0) { + printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(tdata->lock); + close(sd); + return NULL; + } pthread_cond_broadcast(tdata->threshold); } else { pthread_cond_wait(tdata->threshold, tdata->lock); } - //process the participant's request - if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0 - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); + pthread_mutex_unlock(tdata->lock); + + if (sendResponse(tdata, sd) == 0) { + printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_mutex_unlock(tdata->lock); + close(sd); return NULL; } - pthread_mutex_unlock(tdata->lock); - close(sd); pthread_exit(NULL); } @@ -312,6 +303,7 @@ int transCommit(transrecord_t *record) { char transid[TID_LEN]; static int newtid = 0; trans_req_data_t *tosend; + char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent ptr = record->lookupTable->table; size = record->lookupTable->size; @@ -328,14 +320,12 @@ int transCommit(transrecord_t *record) { //Get machine location for object id if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine\n"); + printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); return 1; } - //TODO only for debug - //machinenum = 1; if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { - printf("Error: No such oid\n"); + printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); return 1; } //Make machine groups @@ -378,6 +368,7 @@ int transCommit(transrecord_t *record) { pListMid(pile, listmid); //Process each machine group + //Should be a new function for while loop while(tmp != NULL) { //Create transaction id newtid++; @@ -403,6 +394,8 @@ int transCommit(transrecord_t *record) { thread_data_array[numthreads].threshold = &tcond; thread_data_array[numthreads].lock = &tlock; thread_data_array[numthreads].count = &trecvcount; + thread_data_array[numthreads].replyctrl = &treplyctrl; + thread_data_array[numthreads].replyretry = &treplyretry; thread_data_array[numthreads].rec = record; rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]); @@ -410,10 +403,8 @@ int transCommit(transrecord_t *record) { perror("Error in pthread create"); return 1; } - numthreads++; //TODO frees - //free(tosend); tmp = tmp->next; } @@ -431,11 +422,18 @@ int transCommit(transrecord_t *record) { //Free resources pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); -// for(i = 0 ;i< pilecount ;i++) { - free(tosend); -// } + + free(tosend); free(listmid); pDelete(pile); + if(treplyretry == 1) { + //wait a random amount of time + randomdelay(); + //sleep(1); + //Retry the commiting transaction again + transCommit(record); + } + return 0; } @@ -499,7 +497,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { chashInsert(record->lookupTable, oid, objcopy); break; default: - printf("Error in recv request from participant on a READ_REQUEST\n"); + printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__); return NULL; } close(sd);