From 259dc92f4ace0e9836723cfd99d6aa386fd66e9f Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 21 Jun 2007 16:30:40 +0000 Subject: [PATCH] Clean up code split large code into small functions --- Robust/src/Runtime/DSTM/interface/dstm.h | 18 +- .../src/Runtime/DSTM/interface/dstmserver.c | 255 +++++---- Robust/src/Runtime/DSTM/interface/trans.c | 524 ++++++++++-------- 3 files changed, 438 insertions(+), 359 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index d46d47fb..1ec1b440 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -138,20 +138,22 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes /* Prototypes for server portion */ void *dstmListen(); void *dstmAccept(void *); -int readClientReq(int, trans_commit_data_t *); -char handleTransReq(int, fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *); +int readClientReq(trans_commit_data_t *, int); +int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, int); +char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int); +int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, unsigned int *, int); +int transCommitProcess(trans_commit_data_t *, int); /* end server portion */ /* Prototypes for transactions */ transrecord_t *transStart(); -objheader_t *transRead(transrecord_t *record, unsigned int oid); -objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid -int decideResponse(thread_data_array_t *tdata);// Coordinator decides what response to send to the participant -char sendResponse(thread_data_array_t *tdata, int sd); //Sends control message back to Participants -void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins +objheader_t *transRead(transrecord_t *, unsigned int); +objheader_t *transCreateObj(transrecord_t *, unsigned short); //returns oid int transCommit(transrecord_t *record); //return 0 if successful +void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins +int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant +char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); -int transCommitProcess(trans_commit_data_t *, int); /* end transactions */ void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 9590a1c9..52235800 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -1,3 +1,6 @@ +/* Coordinator => Machine that initiates the transaction request call for commiting a transaction + * Participant => Machines that host the objects involved in a transaction commit */ + #include #include #include @@ -18,8 +21,9 @@ objstr_t *mainobjstore; int dstmInit(void) { - //Initialize main object store + /* Initialize main object store */ mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); + /* Create machine lookup table and location lookup table */ if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -82,7 +86,8 @@ void *dstmListen() } pthread_exit(NULL); } - +/* This function accepts a new connection request, decodes the control message in the connection + * and accordingly calls other functions to process new requests */ void *dstmAccept(void *acceptfd) { int numbytes,i, val, retval; @@ -96,6 +101,7 @@ void *dstmAccept(void *acceptfd) int fd_flags = fcntl((int)acceptfd, F_GETFD), size; printf("Recieved connection: fd = %d\n", (int)acceptfd); + /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { if (retval == 0) { return; // Testing connection @@ -103,11 +109,13 @@ void *dstmAccept(void *acceptfd) perror("Error in receiving control from coordinator\n"); return; } + switch(control) { case READ_REQUEST: + /* Read oid requested and search if available */ if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { perror("Error receiving object from cooridnator\n"); - return; + return NULL; } srcObj = mhashSearch(oid); h = (objheader_t *) srcObj; @@ -116,16 +124,19 @@ void *dstmAccept(void *acceptfd) ctrl = OBJECT_NOT_FOUND; if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending control msg to coordinator\n"); + return NULL; } } else { /* Type */ char msg[]={OBJECT_FOUND, 0, 0, 0, 0}; *((int *)&msg[1])=size; if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) { - perror("Error sending size of object to coordinator\n"); + perror("Error sending size of object to coordinator\n"); + return NULL; } if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) { - perror("Error in sending object\n"); + perror("Error in sending object\n"); + return NULL; } } break; @@ -143,15 +154,19 @@ void *dstmAccept(void *acceptfd) break; case TRANS_REQUEST: + /* Read transaction request */ printf("DEBUG -> Recv TRANS_REQUEST\n"); - if((val = readClientReq((int)acceptfd, &transinfo)) != 0) { + if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { printf("Error in readClientReq\n"); + return; } break; default: printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); } + + /* Close connection */ if (close((int)acceptfd) == -1) perror("close"); else @@ -160,16 +175,15 @@ void *dstmAccept(void *acceptfd) pthread_exit(NULL); } -// Reads transaction request per thread -int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { - char *ptr, control, prevctrl, sendctrl, newctrl; - void *modptr, *header; - objheader_t *tmp_header; +/* This function reads the information available in a transaction request + * and makes a function call to process the request */ +int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { + char *ptr; + void *modptr; fixed_data_t fixed; - int sum = 0, i, N, n, val, retval; + int sum = 0, i, N, n, val; - //Reads to process the TRANS_REQUEST protocol further - // Read fixed_data + /* Read fixed_data_t data structure */ N = sizeof(fixed) - 1; ptr = (char *)&fixed;; fixed.control = TRANS_REQUEST; @@ -178,7 +192,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { sum += n; } while(sum < N && n != 0); - // Read list of mids + /* Read list of mids */ int mcount = fixed.mcount; N = mcount * sizeof(unsigned int); unsigned int listmid[mcount]; @@ -189,7 +203,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { sum += n; } while(sum < N && n != 0); - // Read oid and version tuples + /* Read oid and version tuples for those objects that are not modified in the transaction */ int numread = fixed.numread; N = numread * (sizeof(unsigned int) + sizeof(short)); char objread[N]; @@ -202,62 +216,84 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { } while(sum < N && n != 0); } - // Read modified objects + /* Read modified objects */ if(fixed.nummod != 0) { // If pile contains more than one modified object, // allocate new object store and recv all modified objects + // TODO deallocate this space if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); return 1; } sum = 0; - do { // Recv the objs that are modified at Coordinator + do { // Recv the objs that are modified by the Coordinator n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0); sum += n; } while (sum < fixed.sum_bytes && n != 0); } - // Process the information available in the TRANS_REQUEST control - //Send control message as per all votes from all oids in the machine - if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) { + /*Process the information read */ + if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, acceptfd)) != 0) { + printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__); + return 1; + } + + return 0; +} + +/* This function processes the Coordinator's transaction request using "handleTransReq" + * function and sends a reply to the co-ordinator. + * Following this it also receives a new control message from the co-ordinator and processes this message*/ +int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, + unsigned int *listmid, char *objread, void *modptr, int acceptfd) { + char *ptr, control, sendctrl; + objheader_t *tmp_header; + void *header; + int i = 0, val, retval; + + /* Send reply to the Coordinator */ + if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) { printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__); return 1; } - //Read for new control message from Coordiator + /* Read new control message from Coordiator */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) { perror("Error in receiving control message\n"); return 1; } + /* Process the new control message */ switch(control) { case TRANS_ABORT: - //Mark all ref counts as 1 and do garbage collection + /* Set all ref counts as 1 and do garbage collection */ ptr = modptr; - for(i = 0; i< fixed.nummod; i++) { + for(i = 0; i< fixed->nummod; i++) { tmp_header = (objheader_t *)ptr; tmp_header->rcount = 1; ptr += sizeof(objheader_t) + classsize[tmp_header->type]; } - //Unlock objects that was locked in this machine due to this transaction + /* Unlock objects that was locked due to this transaction */ for(i = 0; i< transinfo->numlocked; i++) { - printf("DEBUG-> Unlocking objects\n"); header = mhashSearch(transinfo->objlocked[i]);// find the header address ((objheader_t *)header)->status &= ~(LOCK); } - //send ack to coordinator + /* Send ack to Coordinator */ printf("DEBUG -> Recv TRANS_ABORT\n"); sendctrl = TRANS_SUCESSFUL; if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); return 1; } - ptr = NULL; - return 0; + // return 0; + break; + case TRANS_COMMIT: + /* Invoke the transCommit process() */ printf("DEBUG -> Recv TRANS_COMMIT \n"); if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); + return 1; } break; @@ -270,7 +306,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { //TODO Use fixed.trans_id TID since Client may have died break; } - //Free memory + /* Free memory */ printf("DEBUG -> Freeing...\n"); fflush(stdout); if (transinfo->objmod != NULL) { @@ -288,39 +324,39 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { return 0; } -//This function runs a decision after all objects involved in TRANS_REQUEST -//and returns the appropriate control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT to the Ccordinator -char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) { - int val; +/* This function increments counters while running a voting decision on all objects involved + * in TRANS_REQUEST */ + +char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) { + int val, i = 0; short version; - char control = 0, ctrlmissoid, *ptr; - int i, j = 0; + char control = 0, *ptr; unsigned int oid; unsigned int *oidnotfound, *oidlocked, *oidmod; + void *mobj; + objheader_t *headptr; + /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int)); - // Counters and arrays to formulate decision on control message to be sent - // version match or no match int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; int objmodnotfound = 0, nummodfound = 0; - void *mobj; - objheader_t *headptr; - - //Process each object present in the pile + + /* modptr points to the beginning of the object store + * created at the Pariticipant. + * Object store holds the modified objects involved in the transaction request */ ptr = modptr; - //Process each oid in the machine pile/ group per thread - //Should be a new function + /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < fixed->numread + fixed->nummod; i++) { - if (i < fixed->numread) {//Object is read + if (i < fixed->numread) {//Objs only read and not modified int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array incr *= i; oid = *((unsigned int *)(objread + incr)); incr += sizeof(unsigned int); version = *((short *)(objread + incr)); - } else {//Obj is modified + } else {//Objs modified headptr = (objheader_t *) ptr; oid = headptr->oid; oidmod[objmod] = oid;//Array containing modified oids @@ -328,42 +364,46 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran version = headptr->version; ptr += sizeof(objheader_t) + classsize[headptr->type]; } - //Check if object is still present in the machine since the beginning of TRANS_REQUEST - if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found - //Save the oids not found for later use + + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; objnotfound++; - } else { // If obj found in machine (i.e. has not moved) - //Check if obj is locked + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ if ((((objheader_t *)mobj)->status & LOCK) == LOCK) { - if (version == ((objheader_t *)mobj)->version) { // If version match - printf("DEBUG -> obj = %d locked\n", ((objheader_t *)mobj)->oid); + if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */ v_matchlock++; - } else {//If versions don't match ..HARD ABORT + } else {/* If versions don't match ...HARD ABORT */ v_nomatch++; - //send TRANS_DISAGREE to Coordinator + /* Send TRANS_DISAGREE to Coordinator */ control = TRANS_DISAGREE; - if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) { + if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd); + printf("DEBUG -> Sending TRANS_DISAGREE\n"); return control; } - } else {//Obj is not locked , so lock object + } else {/* If Obj is not locked then lock object */ ((objheader_t *)mobj)->status |= LOCK; - // TESTING Add sleep to make transactions run for a long time such that - // we can test for soft abort case - sleep(1); - //Save all object oids that are locked on this machine during this transaction request call + + /*TESTING Add random wait to make transactions run for a long time such that + * we can test for soft abort case */ + + randomdelay(); + + /* Save all object oids that are locked on this machine during this transaction request call */ oidlocked[objlocked] = ((objheader_t *)mobj)->oid; objlocked++; - if (version == ((objheader_t *)mobj)->version) { //If versions match + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ v_matchnolock++; - } else { //If versions don't match + } else { /* If versions don't match ...HARD ABORT */ v_nomatch++; - //send TRANS_DISAGREE to Coordinator control = TRANS_DISAGREE; + /* Send TRANS_DISAGREE to Coordinator */ if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to the Coordinator\n"); return 0; @@ -375,10 +415,26 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran } } - //Decide what control message(s) to send - // Should be a new function - if(v_matchnolock == fixed->numread + fixed->nummod) { - //send TRANS_AGREE to Coordinator + /* Decide what control message to send to Coordinator */ + if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked, + modptr, oidnotfound, oidlocked, oidmod, acceptfd)) == 0) { + printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__); + return 0; + } + + return val; + +} +/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT + * to send to Coordinator based on the votes of oids involved in the transaction */ +int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, + int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, + unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidmod, + int acceptfd) { + int val; + char control = 0; + /* Condition to send TRANS_AGREE */ + if(*(v_matchnolock) == fixed->numread + fixed->nummod) { control = TRANS_AGREE; if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { perror("Error in sending control to Coordinator\n"); @@ -386,86 +442,75 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran } printf("DEBUG -> Sending TRANS_AGREE\n"); } - //Condition to send TRANS_SOFT_ABORT - if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { + /* Condition to send TRANS_SOFT_ABORT */ + if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { control = TRANS_SOFT_ABORT; char msg[]={TRANS_SOFT_ABORT, 0,0,0,0}; - *((int*)&msg[1])= objnotfound; + *((int*)&msg[1])= *(objnotfound); printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); - //Sending control message + /* Send control message */ if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) { perror("Error in sending no of objects that are not found\n"); return 0; } - //send number of oids not found and the missing oids if objects are missing in the machine - if(objnotfound != 0) { - int size = sizeof(unsigned int)*objnotfound; - if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) { + /* Send number of oids not found and the missing oids if objects are missing in the machine */ + if(*(objnotfound) != 0) { + int size = sizeof(unsigned int)* *(objnotfound); + if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) { perror("Error in sending objects that are not found\n"); return 0; } } } - - //Do the following when TRANS_DISAGREE is sent - if(control == TRANS_DISAGREE) { - //Set the reference count of the object to 1 in mainstore for garbage collection - ptr = modptr; - for(i = 0; i< fixed->nummod; i++) { - headptr = (objheader_t *) ptr; - headptr->rcount = 1; - ptr += sizeof(objheader_t) + classsize[headptr->type]; - } - //Unlock objects that was locked in the trans - for(i = 0; i< objlocked ; i++) { - mobj = mhashSearch(oidlocked[i]);// find the header address - ((objheader_t *)mobj)->status &= ~(LOCK); - } - } - - //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT + + /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process + * if Participant receives a TRANS_COMMIT */ transinfo->objmod = oidmod; transinfo->objlocked = oidlocked; transinfo->objnotfound = oidnotfound; transinfo->modptr = modptr; transinfo->nummod = fixed->nummod; - transinfo->numlocked = objlocked; - transinfo->numnotfound = objnotfound; + transinfo->numlocked = *(objlocked); + transinfo->numnotfound = *(objnotfound); return control; } -//Process oids in the TRANS_COMMIT requested by the participant and sends an ACK back to Coordinator +/* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer + * addresses in lookup table and also changes version number + * Sends an ACK back to Coordinator */ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { objheader_t *header; int i = 0, offset = 0; char control; - //Process each modified object saved in the mainobject store + /* Process each modified object saved in the mainobject store */ for(i=0; inummod; i++) { if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); } - //change reference count of older address and free space in objstr ?? - header->rcount = 1; //Not sure what would be th val - //change ptr address in mhash table + /* Change reference count of older address and free space in objstr ?? */ + header->rcount = 1; //Not sure what would be the val + + /* Change ptr address in mhash table */ printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]); mhashRemove(transinfo->objmod[i]); mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); offset += sizeof(objheader_t) + classsize[header->type]; - //update object version + + /* Update object version number */ header = (objheader_t *) mhashSearch(transinfo->objmod[i]); header->version += 1; } + /* Unlock locked objects */ for(i=0; inumlocked; i++) { - //unlock objects header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); header->status &= ~(LOCK); } //TODO Update location lookup table - //send ack to coordinator + /* Send ack to coordinator */ control = TRANS_SUCESSFUL; printf("DEBUG-> TRANS_SUCESSFUL\n"); if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index e9daecf7..71482691 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -18,7 +18,8 @@ #define RECEIVE_BUFFER_SIZE 2048 extern int classsize[]; - +plistnode_t *createPiles(transrecord_t *); +/* This functions inserts randowm wait delays in the order of msec */ void randomdelay(void) { struct timespec req, rem; @@ -39,6 +40,8 @@ transrecord_t *transStart() return tmp; } +/* This function finds the location of the objects involved in a transaction + * and returns the pointer to the object if found in a remote location */ objheader_t *transRead(transrecord_t *record, unsigned int oid) { unsigned int machinenumber; @@ -46,25 +49,22 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) void *objcopy; int size; void *buf; - //check cache + /* Search local cache */ if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ //printf("DEBUG -> transRead oid %d found local\n", oid); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { - //Look up in Machine lookup table and found - + /* Look up in machine lookup table and copy into cache*/ //printf("oid is found in Local machinelookup\n"); tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; - //Copy into cache objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)tmp, size); - //Insert into cache's lookup table + /* Insert into cache's lookup table */ chashInsert(record->lookupTable, objheader->oid, objcopy); return(objcopy); - } else { - //Get the object from the remote location - //printf("oid is found in remote machine\n"); + } else { /* If not found in machine look up */ + /* Get the object from the remote location */ machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { @@ -78,7 +78,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) } } } - +/* This function creates objects in the transaction record */ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) { objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type])); @@ -92,106 +92,173 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) return tmp; } -//This function decides the reponse that needs to be sent to all other machines involved in a -//transaction by the machine that initiated the transaction request - -int decideResponse(thread_data_array_t *tdata) { - char control; - int i, transagree = 0, transdisagree = 0, transsoftabort = 0; +plistnode_t *createPiles(transrecord_t *record) { + int i = 0; + unsigned int size;/* Represents number of bins in the chash table */ + chashlistnode_t *curr, *ptr, *next; + plistnode_t *pile = NULL; + unsigned int machinenum; + objheader_t *headeraddr; - //Check common data structure - for (i = 0 ; i < tdata->pilecount ; i++) { - //Switch case - control = tdata->recvmsg[i].rcv_status; - switch(control) { - case TRANS_DISAGREE: - printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n"); - transdisagree++; - break; + ptr = record->lookupTable->table; + size = record->lookupTable->size; - case TRANS_AGREE: - printf("DEBUG-> trans.c Recv TRANS_AGREE\n"); - transagree++; - break; - - case TRANS_SOFT_ABORT: - printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n"); - transsoftabort++; + for(i = 0; i < size ; i++) { + curr = &ptr[i]; + /* Inner loop to traverse the linked list of the cache lookupTable */ + while(curr != NULL) { + //if the first bin in hash table is empty + if(curr->key == 0) { break; - default: - printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); - return -1; + } + next = curr->next; + //Get machine location for object id + + if ((machinenum = lhashSearch(curr->key)) == 0) { + printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); + return NULL; + } + + if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { + printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); + return NULL; + } + //Make machine groups + if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { + printf("pInsert error %s, %d\n", __FILE__, __LINE__); + return NULL; + } + curr = next; } } + + return pile; +} +/* This function initiates the transaction commit process + * Spawns threads for each of the new connections with Participants + * by creating new piles, + * Fills the piles with necesaary information and + * Sends a transrequest() to each pile*/ +int transCommit(transrecord_t *record) { + unsigned int tot_bytes_mod, *listmid; + plistnode_t *pile; + int i, rc; + int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0; + char buffer[RECEIVE_BUFFER_SIZE],control; + char transid[TID_LEN]; + trans_req_data_t *tosend; + static int newtid = 0; + char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ + + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + pile = createPiles(record); + + /* Create the packet to be sent in TRANS_REQUEST */ + + /* Count the number of participants */ + pilecount = pCount(pile); + + /* Create a list of machine ids(Participants) involved in transaction */ + if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + pListMid(pile, listmid); - //Decide what control message to send to Participant - if(transdisagree > 0) { - //Send Abort - *(tdata->replyctrl) = TRANS_ABORT; - printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transagree == tdata->pilecount){ - //Send Commit - *(tdata->replyctrl) = TRANS_COMMIT; - printf("DEBUG-> trans.c Sending TRANS_COMMIT\n"); - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transsoftabort > 0 && transdisagree == 0) { - //Send Abort - *(tdata->replyctrl) = TRANS_ABORT; - *(tdata->replyretry) = 1; - //objstrDelete(tdata->rec->cache); - //chashDelete(tdata->rec->lookupTable); - //free(tdata->rec); - printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); - } else { - printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__); - return -1; - } + + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; - return 0; -} -//This function sends the final response to all threads in their respective socket id -char sendResponse(thread_data_array_t *tdata, int sd) { - int n, N, sum, oidcount = 0; - char *ptr, retval = 0; - unsigned int *oidnotfound; + thread_data_array_t *thread_data_array; + thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - //If the decided response is due to a soft abort and missing objects at the Participant's side - if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { - //Read list of objects missing - if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { - //Break if only objs are locked at the Participant side - N = oidcount * sizeof(unsigned int); - if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - } - ptr = (char *) oidnotfound; - do { - n = read(sd, ptr+sum, N-sum); - sum += n; - } while(sum < N && n !=0); + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + //trans_req_data_t *tosend; + if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; } - retval = TRANS_SOFT_ABORT; - } - //If the decided response is TRANS_ABORT - if(*(tdata->replyctrl) == TRANS_ABORT) { - retval = TRANS_ABORT; - } - if(*(tdata->replyctrl) == TRANS_COMMIT) { - retval = TRANS_COMMIT; + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + thread_data_array[numthreads].thread_id = numthreads; + thread_data_array[numthreads].mid = pile->mid; + thread_data_array[numthreads].pilecount = pilecount; + thread_data_array[numthreads].buffer = tosend; + thread_data_array[numthreads].recvmsg = rcvd_control_msg; + thread_data_array[numthreads].threshold = &tcond; + thread_data_array[numthreads].lock = &tlock; + thread_data_array[numthreads].count = &trecvcount; + thread_data_array[numthreads].replyctrl = &treplyctrl; + thread_data_array[numthreads].replyretry = &treplyretry; + thread_data_array[numthreads].rec = record; + + rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]); + if (rc) { + perror("Error in pthread create"); + return 1; + } + numthreads++; + //TODO frees + pile = pile->next; } - // Send response to the Participant - if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); + + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + for (i = 0 ;i < pilecount ; i++) { + rc = pthread_join(thread[i], NULL); + if (rc) + { + printf("ERROR return code from pthread_join() is %d\n", rc); + return 1; + } } + + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(tosend); + free(listmid); + pDelete(pile); - return retval; + /* Retry trans commit procedure if not sucessful in the first try */ + if(treplyretry == 1) { + /* wait a random amount of time */ + randomdelay(); + //sleep(1); + /* Retry the commiting transaction again */ + transCommit(record); + } + + return 0; } +/* This function sends information involved in the transaction request and + * accepts a response from particpants. + * It calls decideresponse() to decide on what control message + * to send next and sends the message using sendResponse()*/ void *transRequest(void *threadarg) { int sd, i, n; struct sockaddr_in serv_addr; @@ -202,7 +269,8 @@ void *transRequest(void *threadarg) { char machineip[16], retval; tdata = (thread_data_array_t *) threadarg; - //Send Trans Request + + /* Send Trans Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST\n"); return NULL; @@ -213,19 +281,19 @@ void *transRequest(void *threadarg) { midtoIP(tdata->mid,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); - + /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); return NULL; } printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); - //Send bytes of data with TRANS_REQUEST control message + /* Send bytes of data with TRANS_REQUEST control message */ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { perror("Error sending fixed bytes for thread\n"); return NULL; } - //Send list of machines involved in the transaction + /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*tdata->pilecount; if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { @@ -233,7 +301,7 @@ void *transRequest(void *threadarg) { return NULL; } } - //Send oids and version number tuples for objects that are read + /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { @@ -241,7 +309,7 @@ void *transRequest(void *threadarg) { return NULL; } } - //Send objects that are modified + /* Send objects that are modified */ for(i = 0; i < tdata->buffer->f.nummod ; i++) { int size; headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); @@ -252,21 +320,23 @@ void *transRequest(void *threadarg) { } } - //Read message control message from participant side + /* Read control message from Participant */ if((n = read(sd, &control, sizeof(char))) <= 0) { perror("Error in reading control message from Participant\n"); return NULL; } recvcontrol = control; - //Update common data structure and increment count + /* Update common data structure and increment count */ tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; - //Lock and update count + + /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(tdata->lock); - (*(tdata->count))++; - + (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ + + /* Wake up the threads and invoke decideResponse (once) */ if(*(tdata->count) == tdata->pilecount) { if (decideResponse(tdata) != 0) { printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); @@ -280,164 +350,124 @@ void *transRequest(void *threadarg) { } pthread_mutex_unlock(tdata->lock); - + + /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t + * to all participants in their respective socket */ if (sendResponse(tdata, sd) == 0) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_mutex_unlock(tdata->lock); close(sd); return NULL; } + + /* Close connection */ close(sd); pthread_exit(NULL); } -int transCommit(transrecord_t *record) { - chashlistnode_t *curr, *ptr, *next; - unsigned int size;//Represents number of bins in the chash table - unsigned int machinenum, tot_bytes_mod, *listmid; - objheader_t *headeraddr; - plistnode_t *tmp, *pile = NULL; - int i, rc; - int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0; - char buffer[RECEIVE_BUFFER_SIZE],control; - char transid[TID_LEN]; - static int newtid = 0; - trans_req_data_t *tosend; - char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent +/* This function decides the reponse that needs to be sent to + * all Participant machines involved in the transaction commit */ +int decideResponse(thread_data_array_t *tdata) { + char control; + int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what + message to send */ - ptr = record->lookupTable->table; - size = record->lookupTable->size; - //Look through all the objects in the cache and make piles - for(i = 0; i < size ;i++) { - curr = &ptr[i]; - //Inner loop to traverse the linked list of the cache lookupTable - while(curr != NULL) { - //if the first bin in hash table is empty - if(curr->key == 0) { + //Check common data structure + for (i = 0 ; i < tdata->pilecount ; i++) { + /*Switch on response from Participant */ + control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses + written onto the shared array */ + switch(control) { + case TRANS_DISAGREE: + printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n"); + transdisagree++; break; - } - next = curr->next; - //Get machine location for object id - - if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); - return 1; - } - - if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { - printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); - return 1; - } - //Make machine groups - if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { - printf("pInsert error %s, %d\n", __FILE__, __LINE__); - return 1; - } - curr = next; - } - } - //Create the packet to be sent in TRANS_REQUEST - tmp = pile; - pilecount = pCount(pile); //Keeps track of the number of participants - - //Thread related variables - pthread_t thread[pilecount]; //Create threads for each participant - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; - //thread_data_array_t thread_data_array[pilecount]; - thread_data_array_t *thread_data_array; - - thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); - - thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants - - //Initialize and set thread detach attribute - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); - - //Keep track of list of machine ids per transaction - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } + case TRANS_AGREE: + printf("DEBUG-> trans.c Recv TRANS_AGREE\n"); + transagree++; + break; - pListMid(pile, listmid); - //Process each machine group - //Should be a new function for while loop - while(tmp != NULL) { - //Create transaction id - newtid++; - //trans_req_data_t *tosend; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; + case TRANS_SOFT_ABORT: + printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n"); + transsoftabort++; + break; + default: + printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + return -1; } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = tmp->numread; - tosend->f.nummod = tmp->nummod; - tosend->f.sum_bytes = tmp->sum_bytes; - tosend->listmid = listmid; - tosend->objread = tmp->objread; - tosend->oidmod = tmp->oidmod; - thread_data_array[numthreads].thread_id = numthreads; - thread_data_array[numthreads].mid = tmp->mid; - thread_data_array[numthreads].pilecount = pilecount; - thread_data_array[numthreads].buffer = tosend; - thread_data_array[numthreads].recvmsg = rcvd_control_msg; - thread_data_array[numthreads].threshold = &tcond; - thread_data_array[numthreads].lock = &tlock; - thread_data_array[numthreads].count = &trecvcount; - thread_data_array[numthreads].replyctrl = &treplyctrl; - thread_data_array[numthreads].replyretry = &treplyretry; - thread_data_array[numthreads].rec = record; - - rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]); - if (rc) { - perror("Error in pthread create"); - return 1; - } - numthreads++; - //TODO frees - tmp = tmp->next; } + + /* Decide what control message to send to Participant */ + if(transdisagree > 0) { + /* Send Abort */ + *(tdata->replyctrl) = TRANS_ABORT; + printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); + objstrDelete(tdata->rec->cache); + chashDelete(tdata->rec->lookupTable); + free(tdata->rec); + } else if(transagree == tdata->pilecount){ + /* Send Commit */ + *(tdata->replyctrl) = TRANS_COMMIT; + printf("DEBUG-> trans.c Sending TRANS_COMMIT\n"); + objstrDelete(tdata->rec->cache); + chashDelete(tdata->rec->lookupTable); + free(tdata->rec); + } else if(transsoftabort > 0 && transdisagree == 0) { + /* Send Abort in soft abort case followed by retry commiting transaction again*/ + *(tdata->replyctrl) = TRANS_ABORT; + *(tdata->replyretry) = 1; + printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); + } else { + printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__); + return -1; + } + + return 0; +} +/* This function sends the final response to all threads in their respective socket id */ +char sendResponse(thread_data_array_t *tdata, int sd) { + int n, N, sum, oidcount = 0; + char *ptr, retval = 0; + unsigned int *oidnotfound; - // Free attribute and wait for the other threads - pthread_attr_destroy(&attr); - for (i = 0 ;i < pilecount ; i++) { - rc = pthread_join(thread[i], NULL); - if (rc) - { - printf("ERROR return code from pthread_join() is %d\n", rc); - return 1; + /* If the decided response is due to a soft abort and missing objects at the Participant's side */ + if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { + /* Read list of objects missing */ + if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { + N = oidcount * sizeof(unsigned int); + if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + } + ptr = (char *) oidnotfound; + do { + n = read(sd, ptr+sum, N-sum); + sum += n; + } while(sum < N && n !=0); } + retval = TRANS_SOFT_ABORT; + } + /* If the decided response is TRANS_ABORT */ + if(*(tdata->replyctrl) == TRANS_ABORT) { + retval = TRANS_ABORT; + } + /* If the decided response is TRANS_COMMIT */ + if(*(tdata->replyctrl) == TRANS_COMMIT) { + retval = TRANS_COMMIT; + } + /* Send response to the Participant */ + if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { + perror("Error sending ctrl message for participant\n"); } - - //Free resources - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(tosend); - free(listmid); - pDelete(pile); - if(treplyretry == 1) { - //wait a random amount of time - randomdelay(); - //sleep(1); - //Retry the commiting transaction again - transCommit(record); - } - - return 0; + return retval; } -//mnun will be used to represent the machine IP address later +/* This function opens a connection, places an object read request to the + * remote machine, reads the control message and object if available and + * copies the object and its header to the local cache. + * TODO replace mnum and midtoIP() with MACHINE_IP address later */ + void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { int sd, size, val; struct sockaddr_in serv_addr; @@ -458,7 +488,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { midtoIP(mnum,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); - + /* Open connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect\n"); return NULL; @@ -474,7 +504,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { #ifdef DEBUG1 printf("DEBUG -> ready to rcv ...\n"); #endif - //Read response from the Participant + /* Read response from the Participant */ if((val = read(sd, &control, sizeof(char))) <= 0) { perror("No control response for getRemoteObj sent\n"); return NULL; @@ -484,6 +514,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { printf("DEBUG -> Control OBJECT_NOT_FOUND received\n"); return NULL; case OBJECT_FOUND: + /* Read object if found into local cache */ if((val = read(sd, &size, sizeof(int))) <= 0) { perror("No size is read from the participant\n"); return NULL; @@ -493,13 +524,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { perror("No objects are read from the remote participant\n"); return NULL; } - //Insert into cache's lookup table + /* Insert into cache's lookup table */ chashInsert(record->lookupTable, oid, objcopy); break; default: printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__); return NULL; } + /* Close connection */ close(sd); return objcopy; } -- 2.34.1