From 2f8b944454ee4b9f00136308670342b52099371d Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 20 Sep 2007 08:09:59 +0000 Subject: [PATCH] Minor bug fix for trans abort case --- .../src/Runtime/DSTM/interface/dstmserver.c | 17 ++++++++------ Robust/src/Runtime/DSTM/interface/trans.c | 22 +++++++++++-------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 2cea115c..7b27ce8d 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -24,6 +24,7 @@ extern int classsize[]; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; +pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -31,7 +32,11 @@ pthread_mutex_t mainobjstore_mutex; int dstmInit(void) { mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE); - pthread_mutex_init(&mainobjstore_mutex, NULL); + /* Initialize attribute for mutex */ + pthread_mutexattr_init(&mainobjstore_mutex_attr); + pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + //pthread_mutex_init(&mainobjstore_mutex, NULL); + pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr); if (mhashCreate(HASH_SIZE, LOADFACTOR)) return 1; //failure @@ -357,7 +362,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } /* Send ack to Coordinator */ - printf("DEBUG -> Recv TRANS_ABORT\n"); sendctrl = TRANS_SUCESSFUL; if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending ACK to coordinator\n"); @@ -377,7 +381,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, case TRANS_COMMIT: /* Invoke the transCommit process() */ - printf("DEBUG -> Recv TRANS_COMMIT \n"); if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); /* Free memory */ @@ -479,7 +482,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE\n"); return control; } } else {/* If Obj is not locked then lock object */ @@ -503,7 +505,10 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne perror("Error in sending control to the Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_DISAGREE\n"); + if (objlocked > 0) { + STATUS(((objheader_t *)mobj)) &= ~(LOCK); + free(oidlocked); + } return control; } } @@ -535,7 +540,6 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * perror("Error in sending control to Coordinator\n"); return 0; } - printf("DEBUG -> Sending TRANS_AGREE\n"); } /* Condition to send TRANS_SOFT_ABORT */ if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) { @@ -543,7 +547,6 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * char msg[]={TRANS_SOFT_ABORT, 0,0,0,0}; *((int*)&msg[1])= *(objnotfound); - printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); /* Send control message */ if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) { perror("Error in sending no of objects that are not found\n"); diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index fd0cd0d6..6e6caba5 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -173,7 +173,8 @@ void randomdelay(void) t = time(NULL); req.tv_sec = 0; req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec - nanosleep(&req, &rem); + //nanosleep(&req, &rem); + nanosleep(&req, NULL); return; } @@ -203,7 +204,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { rc = gettimeofday(&tp, NULL); /* Convert from timeval to timespec */ - ts.tv_nsec = tp.tv_usec * 10; + ts.tv_nsec = tp.tv_usec * 1000; /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ @@ -535,7 +536,7 @@ int transCommit(transrecord_t *record) { free(ltdata); /* wait a random amount of time */ - if (treplyretry == 1) + if (treplyretry == 1) randomdelay(); /* Retry trans commit procedure if not sucessful in the first try */ @@ -581,7 +582,6 @@ void *transRequest(void *threadarg) { pthread_exit(NULL); } - printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); /* Send bytes of data with TRANS_REQUEST control message */ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { perror("Error sending fixed bytes for thread\n"); @@ -700,7 +700,9 @@ void decideResponse(thread_data_array_t *tdata) { return; } -/* This function sends the final response to remote machines per thread in their respective socket id */ +/* This function sends the final response to remote machines per thread in their respective socket id + * It returns a char that is only needed to check the correctness of execution of this function inside + * transRequest()*/ char sendResponse(thread_data_array_t *tdata, int sd) { int n, N, sum, oidcount = 0; char *ptr, retval = 0; @@ -1007,7 +1009,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } /* Send ack to Coordinator */ - printf("TRANS_SUCCESSFUL\n"); /*Free the pointer */ ptr = NULL; @@ -1329,6 +1330,7 @@ void *transPrefetch(void *t) { /* dequeue node to create a machine piles and finally unlock mutex */ if((qnode = pre_dequeue()) == NULL) { printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&pqueue.qlock); pthread_exit(NULL); } pthread_mutex_unlock(&pqueue.qlock); @@ -1372,6 +1374,7 @@ void *mcqProcess(void *threadid) { /* Dequeue node to send remote machine connections*/ if((mcpilenode = mcpiledequeue()) == NULL) { printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mcqueue.qlock); pthread_exit(NULL); } /* Unlock mutex */ @@ -1518,19 +1521,20 @@ void getPrefetchResponse(int count, int sd) { prehashInsert(oid, modptr); } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { /* Add the new object ptr to hash table */ + prehashRemove(oid); prehashInsert(oid, modptr); - } else { /* Do nothing */ + } else { /* Do nothing: TODO modptr should be reference counted */ ; } } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/ prehashInsert(oid, modptr); } /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); + //pthread_mutex_lock(&pflookup.lock); /* Broadcast signal on prefetch cache condition variable */ pthread_cond_broadcast(&pflookup.cond); /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); + //pthread_mutex_unlock(&pflookup.lock); } else if(buffer[index] == OBJECT_NOT_FOUND) { /* Increment it to get the object */ /* TODO: For each object not found query DHT for new location and retrieve the object */ -- 2.34.1