From c806180654c724fe3051c7dda962dffbc93976e8 Mon Sep 17 00:00:00 2001 From: adash Date: Wed, 21 Oct 2009 01:47:22 +0000 Subject: [PATCH] add changes to release locks early on version mismatch and soft abort --- Robust/src/Runtime/DSTM/interface/dstm.h | 8 +- .../src/Runtime/DSTM/interface/dstmserver.c | 287 +++++++++++++----- Robust/src/Runtime/DSTM/interface/trans.c | 6 + 3 files changed, 216 insertions(+), 85 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 7762784b..2e9c6e9d 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -242,10 +242,14 @@ char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int); int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid); -void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *, +char getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short); -void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, +char getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short); + +void procRestObjs(char *, char *, int , int, int, unsigned int *, unsigned int *, int *, int *, int *, int *); +void processVerNoMatch(unsigned int *, unsigned int *, int *, int *, int *, int *, unsigned int, unsigned short); + /* end server portion */ /* Prototypes for transactions */ diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index fc95dbc3..4122ddc8 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -384,7 +384,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, if (fixed->nummod > 0) free(modptr); /* Unlock objects that was locked due to this transaction */ - int useWriteUnlock = 0; + int useWriteUnlock = 0; //TODO verify is this piece of unlocking code ever used for(i = 0; i< transinfo->numlocked; i++) { if(transinfo->objlocked[i] == -1) { useWriteUnlock = 1; @@ -459,6 +459,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne * Object store holds the modified objects involved in the transaction request */ ptr = (char *) modptr; + char retval; + /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < fixed->numread + fixed->nummod; i++) { if (i < fixed->numread) { //Objs only read and not modified @@ -467,11 +469,11 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne oid = *((unsigned int *)(objread + incr)); incr += sizeof(unsigned int); version = *((unsigned short *)(objread + incr)); - getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, + retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version); } else { //Objs modified if(i == fixed->numread) { - oidlocked[objlocked++] = -1; + oidlocked[objlocked++] = -1; } int tmpsize; headptr = (objheader_t *) ptr; @@ -479,10 +481,40 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne version = headptr->version; GETSIZE(tmpsize, headptr); ptr += sizeof(objheader_t) + tmpsize; - getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, + retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version); } + if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) { + //unlock objects as soon versions mismatch or locks cannot be acquired) + if (objlocked > 0) { + int useWriteUnlock = 0; + for(j = 0; j < objlocked; j++) { + if(oidlocked[j] == -1) { + useWriteUnlock = 1; + continue; + } + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + if(useWriteUnlock) { + write_unlock(STATUSPTR(headptr)); + } else { + read_unlock(STATUSPTR(headptr)); + } + } + if(v_nomatch > 0) + free(oidlocked); + } + objlocked=0; + break; + } + } + //go through rest of the objects for version mismatches + if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) { + i++; + procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes); } /* send TRANS_DISAGREE and objs*/ @@ -499,25 +531,28 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne offset += size; } #endif + /* if (objlocked > 0) { int useWriteUnlock = 0; for(j = 0; j < objlocked; j++) { - if(oidlocked[j] == -1) { - useWriteUnlock = 1; - continue; - } - if((headptr = mhashSearch(oidlocked[j])) == NULL) { - printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 0; - } - if(useWriteUnlock) { - write_unlock(STATUSPTR(headptr)); - } else { - read_unlock(STATUSPTR(headptr)); - } + if(oidlocked[j] == -1) { + useWriteUnlock = 1; + continue; + } + if((headptr = mhashSearch(oidlocked[j])) == NULL) { + printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 0; + } + if(useWriteUnlock) { + write_unlock(STATUSPTR(headptr)); + } else { + read_unlock(STATUSPTR(headptr)); + } } free(oidlocked); } + */ + //control=TRANS_DISAGREE; send_data(acceptfd, &control, sizeof(char)); #ifdef CACHE send_data(acceptfd, &numBytes, sizeof(int)); @@ -540,7 +575,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } /* Update Commit info for objects that are read */ -void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, +char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch, int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) { @@ -551,11 +586,13 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, /* Save the oids not found and number of oids not found for later use */ oidnotfound[*objnotfound] = oid; (*objnotfound)++; + *control = TRANS_DISAGREE; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock if (version == ((objheader_t *)mobj)->version) { /* match versions */ (*v_matchnolock)++; + *control = TRANS_AGREE; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[*objvernotmatch] = oid; @@ -573,6 +610,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, } else { //we are locked if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ (*v_matchlock)++; + *control=TRANS_SOFT_ABORT; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[*objvernotmatch] = oid; @@ -586,10 +624,11 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked, } } } + return *control; } /* Update Commit info for objects that are read */ -void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, +char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) { void *mobj; @@ -598,11 +637,13 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked /* Save the oids not found and number of oids not found for later use */ oidnotfound[*objnotfound] = oid; (*objnotfound)++; + *control = TRANS_DISAGREE; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks if (version == ((objheader_t *)mobj)->version) { /* match versions */ (*v_matchnolock)++; + *control=TRANS_AGREE; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[(*objvernotmatch)++] = oid; @@ -619,6 +660,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked } else { /* Some other transaction has aquired a write lock on this object */ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ (*v_matchlock)++; + *control=TRANS_SOFT_ABORT; } else { /* If versions don't match ...HARD ABORT */ (*v_nomatch)++; oidvernotmatch[*objvernotmatch] = oid; @@ -632,83 +674,162 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked } } } + return *control; } -/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT - * to send to Coordinator based on the votes of oids involved in the transaction */ -char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, - int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, - unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) { - int val; - char control = 0; - - /* Condition to send TRANS_AGREE */ - if(*(v_matchnolock) == fixed->numread + fixed->nummod) { - control = TRANS_AGREE; - /* Send control message */ - send_data(acceptfd, &control, sizeof(char)); +void procRestObjs(char *objread, + char *objmod, + int index, + int numread, + int nummod, + unsigned int *oidnotfound, + unsigned int *oidvernotmatch, + int *objnotfound, + int *objvernotmatch, + int *v_nomatch, + int *numBytes) { + int i; + unsigned int oid; + unsigned short version; + + /* Process each oid in the machine pile/ group per thread */ + //printf("DEBUG: index= %d, numread= %d, nummod= %d numread+nummod= %d\n", index,numread,nummod,numread+nummod); + for (i = index; i < numread+nummod; i++) { + //printf("DEBUG: i= %d\n", i); + //fflush(stdout); + if (i < numread) { //Objs only read and not modified + int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(objread + incr)); + incr += sizeof(unsigned int); + version = *((unsigned short *)(objread + incr)); + } else { //Objs modified + objheader_t *headptr; + headptr = (objheader_t *) objmod; + oid = OID(headptr); + version = headptr->version; + int tmpsize; + GETSIZE(tmpsize, headptr); + objmod += sizeof(objheader_t) + tmpsize; + } + processVerNoMatch(oidnotfound, + oidvernotmatch, + objnotfound, + objvernotmatch, + v_nomatch, + numBytes, + oid, + version); } - /* Condition to send TRANS_SOFT_ABORT */ - if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { - control = TRANS_SOFT_ABORT; + return; +} - /* Send control message */ - send_data(acceptfd, &control, sizeof(char)); +void processVerNoMatch(unsigned int *oidnotfound, + unsigned int *oidvernotmatch, + int *objnotfound, + int *objvernotmatch, + int *v_nomatch, + int *numBytes, + unsigned int oid, + unsigned short version) { + void *mobj; + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ - /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */ - if(*(objnotfound) != 0) { - int msg[1]; - msg[0] = *(objnotfound); - send_data(acceptfd, &msg, sizeof(int)); - int size = sizeof(unsigned int)* *(objnotfound); - send_data(acceptfd, oidnotfound, size); + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[*objnotfound] = oid; + (*objnotfound)++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + //if (!write_trylock(STATUSPTR(mobj))) { // Can acquire write lock + if (version != ((objheader_t *)mobj)->version) { /* match versions */ + (*v_nomatch)++; + oidvernotmatch[*objvernotmatch] = oid; + (*objvernotmatch)++; + int size; + GETSIZE(size, mobj); + size += sizeof(objheader_t); + *numBytes += size; } } - - /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process - * if Participant receives a TRANS_COMMIT */ - transinfo->objlocked = oidlocked; - transinfo->objnotfound = oidnotfound; - transinfo->modptr = modptr; - transinfo->numlocked = *(objlocked); - transinfo->numnotfound = *(objnotfound); - return control; } -/* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer - * addresses in lookup table and also changes version number - * Sends an ACK back to Coordinator */ -int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) { - objheader_t *header; - objheader_t *newheader; - int i = 0, offset = 0; - char control; - int tmpsize; - - /* Process each modified object saved in the mainobject store */ - for(i = 0; i < nummod; i++) { - if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - return 1; + /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT + * to send to Coordinator based on the votes of oids involved in the transaction */ + char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, + int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, + unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) { + int val; + char control = 0; + + /* Condition to send TRANS_AGREE */ + if(*(v_matchnolock) == fixed->numread + fixed->nummod) { + control = TRANS_AGREE; + /* Send control message */ + send_data(acceptfd, &control, sizeof(char)); } - GETSIZE(tmpsize,header); - - { - struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t)); - struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset); - dst->type=src->type; - dst->___cachedCode___=src->___cachedCode___; - dst->___cachedHash___=src->___cachedHash___; - memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___)); - } - header->version += 1; - /* If threads are waiting on this object to be updated, notify them */ - if(header->notifylist != NULL) { - notifyAll(&header->notifylist, OID(header), header->version); + /* Condition to send TRANS_SOFT_ABORT */ + if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { + //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { + control = TRANS_SOFT_ABORT; + + /* Send control message */ + send_data(acceptfd, &control, sizeof(char)); + + /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */ + if(*(objnotfound) != 0) { + int msg[1]; + msg[0] = *(objnotfound); + send_data(acceptfd, &msg, sizeof(int)); + int size = sizeof(unsigned int)* *(objnotfound); + send_data(acceptfd, oidnotfound, size); + } } - offset += sizeof(objheader_t) + tmpsize; + + /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process + * if Participant receives a TRANS_COMMIT */ + transinfo->objlocked = oidlocked; + transinfo->objnotfound = oidnotfound; + transinfo->modptr = modptr; + transinfo->numlocked = *(objlocked); + transinfo->numnotfound = *(objnotfound); + return control; } + /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer + * addresses in lookup table and also changes version number + * Sends an ACK back to Coordinator */ + int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) { + objheader_t *header; + objheader_t *newheader; + int i = 0, offset = 0; + char control; + int tmpsize; + + /* Process each modified object saved in the mainobject store */ + for(i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { + printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize,header); + + { + struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t)); + struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset); + dst->type=src->type; + dst->___cachedCode___=src->___cachedCode___; + dst->___cachedHash___=src->___cachedHash___; + memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___)); + } + header->version += 1; + /* If threads are waiting on this object to be updated, notify them */ + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + offset += sizeof(objheader_t) + tmpsize; + } + if (nummod > 0) free(modptr); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 81cd2081..9007bc7e 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -79,6 +79,7 @@ void printhex(unsigned char *, int); plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); +//#define LOGEVENTS #ifdef LOGEVENTS char bigarray[16*1024*1024]; int bigindex=0; @@ -666,6 +667,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) { } else { prehashInsert(oid, headerObj); } + LOGEVENT('B'); #endif return &objcopy[1]; #else @@ -773,6 +775,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) { } else { prehashInsert(oid, headerObj); } + LOGEVENT('B'); #endif return &objcopy[1]; #else @@ -1049,6 +1052,7 @@ int transCommit() { } else { prehashInsert(oidToPrefetch, header); } + LOGEVENT('E'); length = length - size; offset += size; } @@ -1135,6 +1139,7 @@ int transCommit() { if(finalResponse == TRANS_ABORT) { //printf("Aborting trans\n"); #ifdef TRANSSTATS + LOGEVENT('A'); numTransAbort++; #endif /* Free Resources */ @@ -1143,6 +1148,7 @@ int transCommit() { return TRANS_ABORT; } else if(finalResponse == TRANS_COMMIT) { #ifdef TRANSSTATS + LOGEVENT('C'); numTransCommit++; #endif /* Free Resources */ -- 2.34.1