From ab05b9620223b1e08afaf7a19941858de4ff5486 Mon Sep 17 00:00:00 2001 From: adash Date: Sat, 22 Sep 2007 00:06:08 +0000 Subject: [PATCH] Fixed bugs..Atomic2.java testcase works fine --- Robust/src/Runtime/DSTM/interface/dstm.h | 8 +- .../src/Runtime/DSTM/interface/dstmserver.c | 68 ++--- Robust/src/Runtime/DSTM/interface/objstr.c | 4 + Robust/src/Runtime/DSTM/interface/trans.c | 264 ++++++++---------- Robust/src/Runtime/callconventions | 10 + 5 files changed, 150 insertions(+), 204 deletions(-) create mode 100644 Robust/src/Runtime/callconventions diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 9ef13b5a..ede8fdda 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -226,12 +226,14 @@ objheader_t *transRead(transrecord_t *, unsigned int); objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid int transCommit(transrecord_t *record); //return 0 if successful void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins -void *handleLocalReq(void *); //the C routine that the local m/c thread will execute void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); -int transAbortProcess(void *, unsigned int *, int, int); -int transComProcess(void*, unsigned int *, unsigned int *, unsigned int *, int, int, int); + +void *handleLocalReq(void *); +int transComProcess(local_thread_data_array_t *); +int transAbortProcess(local_thread_data_array_t *); + void prefetch(int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 144e5e81..00b54b0b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -203,13 +203,9 @@ void *dstmAccept(void *acceptfd) printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n", retval); else - { //TODO: execute run method on this global thread object - printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid); + { objType = getObjType(oid); - printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType); startDSMthread(oid, objType); - - } break; @@ -270,16 +266,11 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { } /* Read modified objects */ - if(fixed.nummod != 0) { // If pile contains more than one modified object, - // allocate new object store and recv all modified objects - // TODO deallocate this space - pthread_mutex_lock(&mainobjstore_mutex); - if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) { - printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&mainobjstore_mutex); + if(fixed.nummod != 0) { + if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) { + printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__); return 1; } - pthread_mutex_unlock(&mainobjstore_mutex); sum = 0; do { // Recv the objs that are modified by the Coordinator n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0); @@ -310,7 +301,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { /* Free resources */ if(oidmod != NULL) { free(oidmod); - oidmod = NULL; } return 1; } @@ -318,7 +308,6 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { /* Free resources */ if(oidmod != NULL) { free(oidmod); - oidmod = NULL; } return 0; @@ -349,15 +338,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, /* Process the new control message */ switch(control) { case TRANS_ABORT: - /* Set all ref counts as 1 and do garbage collection */ - ptr = modptr; - for(i = 0; i< fixed->nummod; i++) { - int tmpsize; - tmp_header = (objheader_t *)ptr; - tmp_header->rcount = 0; - GETSIZE(tmpsize, tmp_header); - ptr += sizeof(objheader_t) + tmpsize; - } + if (fixed->nummod > 0) + free(modptr); /* Unlock objects that was locked due to this transaction */ for(i = 0; i< transinfo->numlocked; i++) { header = mhashSearch(transinfo->objlocked[i]);// find the header address @@ -385,8 +367,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) { printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__); /* Free memory */ - printf("DEBUG -> Freeing...\n"); - fflush(stdout); if (transinfo->objlocked != NULL) { free(transinfo->objlocked); } @@ -408,9 +388,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, } /* Free memory */ - printf("DEBUG -> Freeing...\n"); - fflush(stdout); - if (transinfo->objlocked != NULL) { free(transinfo->objlocked); } @@ -425,7 +402,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) { int val, i = 0; - short version; + unsigned short version; char control = 0, *ptr; unsigned int oid; unsigned int *oidnotfound, *oidlocked; @@ -450,7 +427,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne incr *= i; oid = *((unsigned int *)(objread + incr)); incr += sizeof(unsigned int); - version = *((short *)(objread + incr)); + version = *((unsigned short *)(objread + incr)); } else {//Objs modified int tmpsize; headptr = (objheader_t *) ptr; @@ -483,12 +460,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne } } else {/* If Obj is not locked then lock object */ STATUS(((objheader_t *)mobj)) |= LOCK; - - /*TESTING Add random wait to make transactions run for a long time such that - * we can test for soft abort case */ - - //randomdelay(); - /* Save all object oids that are locked on this machine during this transaction request call */ oidlocked[objlocked] = OID(((objheader_t *)mobj)); objlocked++; @@ -575,8 +546,10 @@ int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int * * Sends an ACK back to Coordinator */ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) { objheader_t *header; + objheader_t *newheader; int i = 0, offset = 0; char control; + int tmpsize; /* Process each modified object saved in the mainobject store */ for(i = 0; i < nummod; i++) { @@ -584,22 +557,17 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 0; - - /* Change ptr address in mhash table */ - mhashRemove(oidmod[i]); - mhashInsert(oidmod[i], (((char *)modptr) + offset)); - { - int tmpsize; - GETSIZE(tmpsize,header); - offset += sizeof(objheader_t) + tmpsize; - } - /* Update object version number */ - header = (objheader_t *) mhashSearch(oidmod[i]); + GETSIZE(tmpsize,header); + pthread_mutex_lock(&mainobjstore_mutex); + memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t)); header->version += 1; + pthread_mutex_unlock(&mainobjstore_mutex); + offset += sizeof(objheader_t) + tmpsize; } + if (nummod > 0) + free(modptr); + /* Unlock locked objects */ for(i = 0; i < numlocked; i++) { if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { diff --git a/Robust/src/Runtime/DSTM/interface/objstr.c b/Robust/src/Runtime/DSTM/interface/objstr.c index 4a66fa54..08b3c571 100644 --- a/Robust/src/Runtime/DSTM/interface/objstr.c +++ b/Robust/src/Runtime/DSTM/interface/objstr.c @@ -38,12 +38,16 @@ void *objstrAlloc(objstr_t *store, unsigned int size) if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects { store->next = (objstr_t *)malloc(sizeof(objstr_t) + size); + if (store->next == NULL) + return NULL; store = store->next; store->size = size; } else { store->next = malloc(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE); + if (store->next == NULL) + return NULL; store = store->next; store->size = DEFAULT_OBJ_STORE_SIZE; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index cf7f77e7..e76fc95c 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -46,6 +46,22 @@ unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; +void printhex(unsigned char *, int); + +void printhex(unsigned char *ptr, int numBytes) +{ + int i; + for (i = 0; i < numBytes; i++) + { + if (ptr[i] < 16) + printf("0%x ", ptr[i]); + else + printf("%x ", ptr[i]); + } + printf("\n"); + return; +} + plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { int i; @@ -186,7 +202,6 @@ void randomdelay(void) /* This function initializes things required in the transaction start*/ transrecord_t *transStart() { - printf("Starting transaction\n"); transrecord_t *tmp = malloc(sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); @@ -209,7 +224,15 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { rc = gettimeofday(&tp, NULL); + /* 1ms delay */ + tp.tv_usec += 1000; + if (tp.tv_usec >= 1000000) + { + tp.tv_usec -= 1000000; + tp.tv_sec += 1; + } /* Convert from timeval to timespec */ + ts.tv_sec = tp.tv_sec; ts.tv_nsec = tp.tv_usec * 1000; /* Search local transaction cache */ @@ -223,7 +246,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* Look up in machine lookup table and copy into cache*/ GETSIZE(size, objheader); size += sizeof(objheader_t); - //TODO:Lock the local trans cache while copying the object here objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)objheader, size); /* Insert into cache's lookup table */ @@ -234,10 +256,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { return objcopy; #endif } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ - found = 1; GETSIZE(size, tmp); size+=sizeof(objheader_t); - //TODO:Lock the local trans cache while copying the object here objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)tmp, size); /* Insert into cache's lookup table */ @@ -252,26 +272,24 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { pthread_mutex_lock(&pflookup.lock); while(!found) { rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); - if(rc == ETIMEDOUT) { - printf("Wait timed out\n"); - /* Check Prefetch cache again */ - if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) { - found = 1; - GETSIZE(size,tmp); - size+=sizeof(objheader_t); - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); - chashInsert(record->lookupTable, OID(tmp), objcopy); - pthread_mutex_unlock(&pflookup.lock); + /* Check Prefetch cache again */ + if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) { + found = 1; + GETSIZE(size,tmp); + size+=sizeof(objheader_t); + objcopy = objstrAlloc(record->cache, size); + memcpy(objcopy, (void *)tmp, size); + chashInsert(record->lookupTable, OID(tmp), objcopy); + pthread_mutex_unlock(&pflookup.lock); #ifdef COMPILER - return &objcopy[1]; + return &objcopy[1]; #else - return objcopy; + return objcopy; #endif - } else { + } else if (rc == ETIMEDOUT) { +// printf("Wait timed out\n"); pthread_mutex_unlock(&pflookup.lock); break; - } } } @@ -701,6 +719,11 @@ void decideResponse(thread_data_array_t *tdata) { /* Send Abort */ *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 0; + /* clear objects from prefetch cache */ + for (i = 0; i < tdata->buffer->f.numread; i++) + prehashRemove(*(unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); + for (i = 0; i < tdata->buffer->f.nummod; i++) + prehashRemove(tdata->buffer->oidmod[i]); } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; @@ -790,9 +813,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return NULL; } -#ifdef DEBUG1 - printf("DEBUG -> ready to rcv ...\n"); -#endif /* Read response from the Participant */ if((val = read(sd, &control, sizeof(char))) <= 0) { perror("No control response for getRemoteObj sent\n"); @@ -831,73 +851,44 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { * based on common agreement it either commits or aborts the transaction. * It also frees the memory resources */ void *handleLocalReq(void *threadarg) { - int val, i = 0, size, offset = 0; - short version; - char control = 0, *ptr; - unsigned int oid; unsigned int *oidnotfound = NULL, *oidlocked = NULL; - void *mobj, *modptr; - objheader_t *headptr, *headeraddr; local_thread_data_array_t *localtdata; + int objnotfound = 0, objlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int numread, i; + unsigned int oid; + unsigned short version; + void *mobj; + objheader_t *headptr; localtdata = (local_thread_data_array_t *) threadarg; /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - int objnotfound = 0, objlocked = 0; - int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - - /* modptr points to the beginning of the object store - * created at the Pariticipant */ - pthread_mutex_lock(&mainobjstore_mutex); - if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) { - printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&mainobjstore_mutex); - pthread_exit(NULL); - } - pthread_mutex_unlock(&mainobjstore_mutex); - /* Write modified objects into the mainobject store */ - for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) { - headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]); - GETSIZE(size,headeraddr); - size+=sizeof(objheader_t); - memcpy((char *)modptr+offset, headeraddr, size); - offset += size; - } - /* Write new objects into the mainobject store */ - for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) { - headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]); - GETSIZE(size, headeraddr); - size+=sizeof(objheader_t); - memcpy((char *)modptr+offset, headeraddr, size); - offset += size; - } - - ptr = modptr; - offset = 0; //Reset + numread = localtdata->tdata->buffer->f.numread; /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { - if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified + if (i < localtdata->tdata->buffer->f.numread) { int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array incr *= i; oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr)); - incr += sizeof(unsigned int); - version = *((short *)(localtdata->tdata->buffer->objread + incr)); - } else {//Objs modified + version = *((short *)(localtdata->tdata->buffer->objread + incr + sizeof(unsigned int))); + } else { // Objects Modified int tmpsize; - headptr = (objheader_t *)ptr; + headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); + if (headptr == NULL) { + printf("Error: handleLocalReq() returning NULL\n"); + return NULL; + } oid = OID(headptr); version = headptr->version; - GETSIZE(tmpsize, headptr); - ptr += sizeof(objheader_t) + tmpsize; } - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ /* Save the oids not found and number of oids not found for later use */ - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ /* Save the oids not found and number of oids not found for later use */ oidnotfound[objnotfound] = oid; objnotfound++; @@ -913,9 +904,6 @@ void *handleLocalReq(void *threadarg) { } } else {/* If Obj is not locked then lock object */ STATUS(((objheader_t *)mobj)) |= LOCK; - //TODO Remove this for Testing - //randomdelay(); -- Why is this here. BCD - /* Save all object oids that are locked on this machine during this transaction request call */ oidlocked[objlocked] = OID(((objheader_t *)mobj)); objlocked++; @@ -928,8 +916,7 @@ void *handleLocalReq(void *threadarg) { } } } - } - + } // End for /* Condition to send TRANS_AGREE */ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; @@ -937,25 +924,15 @@ void *handleLocalReq(void *threadarg) { /* Condition to send TRANS_SOFT_ABORT */ if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; - //TODO currently the only soft abort case that is supported is when object locked by previous - //transaction => v_matchlock > 0 - //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported - /* Send number of oids not found and the missing oids if objects are missing in the machine */ - /* TODO Remember to store the oidnotfound for later use - if(objnotfound != 0) { - int size = sizeof(unsigned int)* objnotfound; - } - */ } /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process * if Participant receives a TRANS_COMMIT */ localtdata->transinfo->objlocked = oidlocked; localtdata->transinfo->objnotfound = oidnotfound; - localtdata->transinfo->modptr = modptr; + localtdata->transinfo->modptr = NULL; localtdata->transinfo->numlocked = objlocked; localtdata->transinfo->numnotfound = objnotfound; - /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(localtdata->tdata->lock); @@ -969,51 +946,38 @@ void *handleLocalReq(void *threadarg) { pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); } pthread_mutex_unlock(localtdata->tdata->lock); - - /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/ if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ - if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) { + if(transAbortProcess(localtdata) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); pthread_exit(NULL); } - }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ - if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) { + } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { + if(transComProcess(localtdata) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); pthread_exit(NULL); } } - /* Free memory */ if (localtdata->transinfo->objlocked != NULL) { free(localtdata->transinfo->objlocked); - localtdata->transinfo->objlocked = NULL; } if (localtdata->transinfo->objnotfound != NULL) { free(localtdata->transinfo->objnotfound); - localtdata->transinfo->objnotfound = NULL; } pthread_exit(NULL); } -/* This function completes the ABORT process if the transaction is aborting -*/ -int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) { - char *ptr; - int i; - objheader_t *tmp_header; + +/* This function completes the ABORT process if the transaction is aborting */ +int transAbortProcess(local_thread_data_array_t *localtdata) { + int i, numlocked; + unsigned int *objlocked; void *header; - /* Set all ref counts as 1 and do garbage collection */ - ptr = modptr; - for(i = 0; i< nummod; i++) { - int tmpsize; - tmp_header = (objheader_t *)ptr; - tmp_header->rcount = 0; - GETSIZE(tmpsize, tmp_header); - ptr += sizeof(objheader_t) + tmpsize; - } - /* Unlock objects that was locked due to this transaction */ - for(i = 0; i< numlocked; i++) { + numlocked = localtdata->transinfo->numlocked; + objlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < numlocked; i++) { if((header = mhashSearch(objlocked[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; @@ -1021,68 +985,68 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int STATUS(((objheader_t *)header)) &= ~(LOCK); } - /* Send ack to Coordinator */ + printf("TRANS_ABORTED at Coordinator end\n"); - /*Free the pointer */ - ptr = NULL; return 0; } -/*This function completes the COMMIT process is the transaction is commiting -*/ -int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) { - objheader_t *header; - int i = 0, offset = 0; - char control; - - /* Process each modified object saved in the mainobject store */ - for(i = 0; i < nummod; i++) { - int tmpsize; +/*This function completes the COMMIT process is the transaction is commiting*/ +int transComProcess(local_thread_data_array_t *localtdata) { + objheader_t *header, *tcptr; + int i, nummod, tmpsize, numcreated, numlocked; + unsigned int *oidmod, *oidcreated, *oidlocked; + void *ptrcreate; + + nummod = localtdata->tdata->buffer->f.nummod; + oidmod = localtdata->tdata->buffer->oidmod; + numcreated = localtdata->tdata->buffer->f.numcreated; + oidcreated = localtdata->tdata->buffer->oidcreated; + numlocked = localtdata->transinfo->numlocked; + oidlocked = localtdata->transinfo->objlocked; + for (i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + /* Copy from transaction cache -> main object store */ + if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 0; - - /* Change ptr address in mhash table */ - mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary - mhashInsert(oidmod[i], (((char *)modptr) + offset)); GETSIZE(tmpsize, header); - offset += sizeof(objheader_t) + tmpsize; - /* Update object version number */ - header = (objheader_t *) mhashSearch(oidmod[i]); + pthread_mutex_lock(&mainobjstore_mutex); + memcpy(header, tcptr, tmpsize + sizeof(objheader_t)); header->version += 1; + pthread_mutex_unlock(&mainobjstore_mutex); } - - /*If object is in prefetch cache then update it in prefetch cache */ - - /* If object is newly created inside transaction then commit it */ - for (i = 0; i < numcreated; i++) - { - int tmpsize; - header = (objheader_t *)(((char *)modptr) + offset); - mhashInsert(oidcreated[i], (((char *)modptr) + offset)); + for (i = 0; i < numcreated; i++) { + if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } GETSIZE(tmpsize, header); - offset += sizeof(objheader_t) + tmpsize; + pthread_mutex_lock(&mainobjstore_mutex); + if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { + printf("Error: transComProcess() failed objstrAlloc\n"); + pthread_mutex_unlock(&mainobjstore_mutex); + return 1; + } + pthread_mutex_unlock(&mainobjstore_mutex); + memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t)); + + mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); } - /* Unlock locked objects */ for(i = 0; i < numlocked; i++) { - if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } STATUS(header) &= ~(LOCK); } - - //TODO Update location lookup table - - /* Send ack to Coordinator */ - printf("TRANS_SUCCESSFUL\n"); return 0; } @@ -1146,7 +1110,6 @@ void checkPrefetchTuples(prefetchqelem_t *node) { } else { k = endoffsets[i-1]; index = endoffsets[j-1]; - printf("Value of slength = %d\n", slength); for(count = 0; count < slength; count++) { if(arryfields[k] != arryfields[index]) { break; @@ -1418,7 +1381,6 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); - //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); midtoIP(mcpilenode->mid ,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); diff --git a/Robust/src/Runtime/callconventions b/Robust/src/Runtime/callconventions new file mode 100644 index 00000000..6eb59c74 --- /dev/null +++ b/Robust/src/Runtime/callconventions @@ -0,0 +1,10 @@ +All calling conventions for native methods are #defines + +We support two types of garbage collectors +1. Precise Garbage Collector +2. Conservative Garbage Collector + +CALLXX => CALL, no of additional parameters other than the name, no of parameters for garbage collection +for e.g. +CALL11(x, y, y) +call, 1= number of parameters i.e. y, 1= No of parameters to be garbage collected i.e. y -- 2.34.1