From: erubow Date: Fri, 7 Sep 2007 22:05:25 +0000 (+0000) Subject: Object creation within transaction works now. Yay! X-Git-Tag: preEdgeChange~463 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=e05aca6b246658926ad51a6168d96cd701cd5891;p=IRC.git Object creation within transaction works now. Yay! --- diff --git a/Robust/src/Runtime/DSTM/interface/clookup.c b/Robust/src/Runtime/DSTM/interface/clookup.c index ca7d34c9..45ac76c6 100644 --- a/Robust/src/Runtime/DSTM/interface/clookup.c +++ b/Robust/src/Runtime/DSTM/interface/clookup.c @@ -46,7 +46,7 @@ unsigned int chashInsert(chashtable_t *table, unsigned int key, void *val) { table->numelements++; index = chashFunction(table, key); #ifdef DEBUG - printf("DEBUG -> index = %d, key = %d, val = %x\n", index, key, val); + printf("chashInsert(): DEBUG -> index = %d, key = %d, val = %x\n", index, key, val); #endif if(ptr[index].next == NULL && ptr[index].key == 0) { // Insert at the first position in the hashtable ptr[index].key = key; diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index cf855326..2574271a 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -120,6 +120,7 @@ typedef struct fixed_data { int mcount; /* participant count */ short numread; /* no of objects read */ short nummod; /* no of objects modified */ + short numcreated; /* no of objects created */ int sum_bytes; /* total bytes of modified objects in a transaction */ } fixed_data_t; @@ -129,6 +130,7 @@ typedef struct trans_req_data { unsigned int *listmid; /* Pointer to array holding list of participants */ char *objread; /* Pointer to array holding oid and version number of objects that are only read */ unsigned int *oidmod; /* Pointer to array holding oids of objects that are modified */ + unsigned int *oidcreated; /* Pointer to array holding oids of objects that are newly created */ } trans_req_data_t; /* Structure that holds information of objects that are not found in the participant @@ -204,7 +206,6 @@ int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); int dstmStartup(const char *); void transInit(); -void * dstmalloc(transrecord_t *trans, int size); void randomdelay(void); transrecord_t *transStart(); @@ -217,8 +218,7 @@ int decideResponse(thread_data_array_t *);// Coordinator decides what response t char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); int transAbortProcess(void *, unsigned int *, int, int); -//int transComProcess(trans_commit_data_t *); -int transComProcess(void*, unsigned int *, unsigned int *, int, int); +int transComProcess(void*, unsigned int *, unsigned int *, unsigned int *, int, int, int); void prefetch(int, unsigned int *, short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 65460bd9..8aa6e35f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -311,7 +311,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, ptr = modptr; for(i = 0; i< fixed->nummod; i++) { tmp_header = (objheader_t *)ptr; - tmp_header->rcount = 1; + tmp_header->rcount = 0; ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)]; } /* Unlock objects that was locked due to this transaction */ @@ -526,7 +526,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock return 1; } /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 1; //Not sure what would be the val + header->rcount = 0; /* Change ptr address in mhash table */ mhashRemove(oidmod[i]); diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.c b/Robust/src/Runtime/DSTM/interface/mlookup.c index 2c952638..a9cca795 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface/mlookup.c @@ -58,7 +58,7 @@ unsigned int mhashInsert(unsigned int key, void *val) { return 1; } node->key = key; - node->val = val ; + node->val = val; node->next = ptr[index].next; ptr[index].next = node; } diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c index 7bf79ba6..e542ed94 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.c +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -21,20 +21,21 @@ plistnode_t *pCreate(int objects) { free(pile); return NULL; } - /* - if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) { + if ((pile->oidcreated = calloc(objects, sizeof(unsigned int))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); + free(pile); + free(pile->oidmod); return NULL; } - */ if ((pile->objread = calloc(objects, sizeof(unsigned int) + sizeof(short))) == NULL) { printf("Calloc error %s %d\n", __FILE__, __LINE__); free(pile); free(pile->oidmod); + free(pile->oidcreated); return NULL; } - pile->nummod = pile->numread = pile->sum_bytes = 0; + pile->nummod = pile->numread = pile->numcreated = pile->sum_bytes = 0; pile->next = NULL; return pile; } @@ -49,12 +50,15 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi //Add oid into a machine that is already present in the pile linked list structure while(tmp != NULL) { if (tmp->mid == mid) { - if (STATUS(headeraddr) & DIRTY) { + if (STATUS(headeraddr) & NEW) { + tmp->oidcreated[tmp->numcreated] = OID(headeraddr); + tmp->numcreated = tmp->numcreated + 1; + tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)]; + } else if (STATUS(headeraddr) & DIRTY) { tmp->oidmod[tmp->nummod] = OID(headeraddr); tmp->nummod = tmp->nummod + 1; tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)]; } else { - // tmp->oidread[tmp->numread] = OID(headeraddr); offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread; *((unsigned int *)(tmp->objread + offset))=OID(headeraddr); offset += sizeof(unsigned int); @@ -72,12 +76,15 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi return NULL; } ptr->mid = mid; - if (STATUS(headeraddr) & DIRTY) { + if (STATUS(headeraddr) & NEW) { + ptr->oidcreated[ptr->numcreated] = OID(headeraddr); + ptr->numcreated = ptr->numcreated + 1; + ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)]; + } else if (STATUS(headeraddr) & DIRTY) { ptr->oidmod[ptr->nummod] = OID(headeraddr); ptr->nummod = ptr->nummod + 1; ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)]; } else { - // ptr->oidread[ptr->numread] = OID(headeraddr); *((unsigned int *)ptr->objread)=OID(headeraddr); memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short)); ptr->numread = ptr->numread + 1; @@ -86,6 +93,9 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi pile = ptr; } + STATUS(headeraddr) &= ~(NEW); + STATUS(headeraddr) &= ~(DIRTY); + return pile; } diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h index e35c451b..777f0259 100644 --- a/Robust/src/Runtime/DSTM/interface/plookup.h +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -10,13 +10,13 @@ * participants involved in a transaction. */ typedef struct plistnode { unsigned int mid; - int local; /* Variable that keeps track if this pile is for LOCAL machine */ - unsigned int *oidmod; /* Pointer to array containing oids of modified objects */ - unsigned int *oidread; /* TODO: REMOVE THIS Pointer to array of objects read */ - int nummod; /* no of objects read */ - int numread; /* no of objects modified */ + short numread; /* no of objects modified */ + short nummod; /* no of objects read */ + short numcreated; /* no of objects created */ int sum_bytes; /* total bytes of objects modified */ char *objread; /* Pointer to array containing oids of objects read and their version numbers*/ + unsigned int *oidmod; /* Pointer to array containing oids of modified objects */ + unsigned int *oidcreated; /* Pointer to array containing oids of newly created objects */ struct plistnode *next; } plistnode_t; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 8e86b346..83bdaa6f 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -33,6 +33,7 @@ extern prehashtable_t pflookup; //Global Prefetch cache's lookup table pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue pthread_t tPrefetch; extern objstr_t *mainobjstore; +unsigned int myIpAddr; plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { @@ -80,22 +81,14 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi pthread_mutex_unlock(&pqueue.qlock); } -/* This function allocates an object on the local machine */ -//FIXME - -void * dstmalloc(transrecord_t *trans, int size) { - objheader_t * newobj=(objheader_t *)objstrAlloc(trans->cache, size+sizeof(objheader_t)); - //Need to assign OID - - return newobj; -} - /* This function starts up the transaction runtime. */ int dstmStartup(const char * option) { pthread_t thread_Listen; pthread_attr_t attr; int master=strcmp(option, "master")==0; + myIpAddr = getMyIpAddr("eth0"); + dstmInit(); transInit(); @@ -180,7 +173,6 @@ transrecord_t *transStart() /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ objheader_t *transRead(transrecord_t *record, unsigned int oid) { - printf("Inside transaction read call\n"); unsigned int machinenumber; objheader_t *tmp, *objheader; void *objcopy; @@ -196,11 +188,9 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ - printf("Inside transaction cache \n"); return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ - printf("Inside mainobject store \n"); tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[TYPE(tmp)]; objcopy = objstrAlloc(record->cache, size); @@ -209,7 +199,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { chashInsert(record->lookupTable, OID(objheader), objcopy); return(objcopy); } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ - printf("Inside prefetch cache \n"); found = 1; size = sizeof(objheader_t)+classsize[TYPE(tmp)]; objcopy = objstrAlloc(record->cache, size); @@ -262,7 +251,7 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) OID(tmp) = getNewOID(); TYPE(tmp) = type; tmp->version = 1; - tmp->rcount = 0; //? not sure how to handle this yet + tmp->rcount = 1; STATUS(tmp) = NEW; chashInsert(record->lookupTable, OID(tmp), tmp); return tmp; @@ -278,7 +267,7 @@ plistnode_t *createPiles(transrecord_t *record) { unsigned int machinenum; void *localmachinenum; objheader_t *headeraddr; - + ptr = record->lookupTable->table; size = record->lookupTable->size; @@ -291,30 +280,26 @@ plistnode_t *createPiles(transrecord_t *record) { break; } next = curr->next; - //Get machine location for object id - //TODO Check is the object is newly created ...if not then lookup the location table - if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); + if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { + printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); return NULL; } - if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { - printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); + //Get machine location for object id (and whether local or not) + if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) { + machinenum = myIpAddr; + } else if ((machinenum = lhashSearch(curr->key)) == 0) { + printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); return NULL; } + //Make machine groups if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { printf("pInsert error %s, %d\n", __FILE__, __LINE__); return NULL; } - /* Check if local or not */ - if((localmachinenum = mhashSearch(curr->key)) != NULL) { - /* Set the pile->local flag*/ - pile->local = 1; //True i.e. local - } - curr = next; } } @@ -339,126 +324,125 @@ int transCommit(transrecord_t *record) { char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ char localstat = 0; + do { - /* Look through all the objects in the transaction record and make piles - * for each machine involved in the transaction*/ - pile_ptr = pile = createPiles(record); - - /* Create the packet to be sent in TRANS_REQUEST */ + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + pile_ptr = pile = createPiles(record); - /* Count the number of participants */ - pilecount = pCount(pile); + /* Create the packet to be sent in TRANS_REQUEST */ - /* Create a list of machine ids(Participants) involved in transaction */ - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - pListMid(pile, listmid); + /* Count the number of participants */ + pilecount = pCount(pile); - - /* Initialize thread variables, - * Spawn a thread for each Participant involved in a transaction */ - pthread_t thread[pilecount]; - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; - - thread_data_array_t *thread_data_array; - thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); - local_thread_data_array_t *ltdata; - if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - - thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - - /* Initialize and set thread detach attribute */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); - - /* Process each machine pile */ - while(pile != NULL) { - //Create transaction id - newtid++; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { + /* Create a list of machine ids(Participants) involved in transaction */ + if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + pListMid(pile, listmid); + + + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; + + thread_data_array_t *thread_data_array; + thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); + local_thread_data_array_t *ltdata; + if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); return 1; } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - tosend->f.nummod = pile->nummod; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].pilecount = pilecount; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->local != 1) { /* Not local */ - rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); - if (rc) { - perror("Error in pthread create\n"); + + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ + + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); return 1; } - } else { /*Local*/ - /*Unset the pile->local flag*/ - pile->local = 0; - /*Set flag to identify that Local machine is involved*/ - ltdata->tdata = &thread_data_array[threadnum]; - ltdata->transinfo = &transinfo; - val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata); - if (val) { - perror("Error in pthread create\n"); - return 1; + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.numcreated = pile->numcreated; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + tosend->oidcreated = pile->oidcreated; + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].pilecount = pilecount; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->mid != myIpAddr) { /* Not local */ + rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); + if (rc) { + perror("Error in pthread create\n"); + return 1; + } + } else { /*Local*/ + ltdata->tdata = &thread_data_array[threadnum]; + ltdata->transinfo = &transinfo; + val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata); + if (val) { + perror("Error in pthread create\n"); + return 1; + } } + threadnum++; + pile = pile->next; } - threadnum++; - pile = pile->next; - } - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); - for (i = 0 ;i < pilecount ; i++) { - rc = pthread_join(thread[i], NULL); - if (rc) - { - printf("ERROR return code from pthread_join() is %d\n", rc); - return 1; + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + for (i = 0 ;i < pilecount ; i++) { + rc = pthread_join(thread[i], NULL); + if (rc) + { + printf("ERROR return code from pthread_join() is %d\n", rc); + return 1; + } + free(thread_data_array[i].buffer); } - free(thread_data_array[i].buffer); - } - /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(listmid); - pDelete(pile_ptr); - free(thread_data_array); - free(ltdata); + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(listmid); + pDelete(pile_ptr); + free(thread_data_array); + free(ltdata); - /* Retry trans commit procedure if not sucessful in the first try */ - if(treplyretry == 1) { /* wait a random amount of time */ - randomdelay(); - /* Retry the commiting transaction again */ - transCommit(record); - } + if (treplyretry == 1) + randomdelay(); + + /* Retry trans commit procedure if not sucessful in the first try */ + } while (treplyretry == 1); return 0; } @@ -584,17 +568,14 @@ int decideResponse(thread_data_array_t *tdata) { written onto the shared array */ switch(control) { case TRANS_DISAGREE: - printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n"); transdisagree++; break; case TRANS_AGREE: - printf("DEBUG-> trans.c Recv TRANS_AGREE\n"); transagree++; break; case TRANS_SOFT_ABORT: - printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n"); transsoftabort++; break; default: @@ -606,7 +587,6 @@ int decideResponse(thread_data_array_t *tdata) { /* Send Abort */ if(transdisagree > 0) { *(tdata->replyctrl) = TRANS_ABORT; - printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); /* Free resources */ objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); @@ -614,7 +594,6 @@ int decideResponse(thread_data_array_t *tdata) { } else if(transagree == tdata->pilecount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; - printf("DEBUG-> trans.c Sending TRANS_COMMIT\n"); /* Free resources */ objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); @@ -623,9 +602,7 @@ int decideResponse(thread_data_array_t *tdata) { /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 1; - printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); } else { - printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__); return -1; } @@ -716,7 +693,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { } switch(control) { case OBJECT_NOT_FOUND: - printf("DEBUG -> Control OBJECT_NOT_FOUND received\n"); return NULL; case OBJECT_FOUND: /* Read object if found into local cache */ @@ -752,7 +728,7 @@ void *handleLocalReq(void *threadarg) { short version; char control = 0, *ptr; unsigned int oid; - unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL; + unsigned int *oidnotfound = NULL, *oidlocked = NULL; void *mobj, *modptr; objheader_t *headptr, *headeraddr; local_thread_data_array_t *localtdata; @@ -778,6 +754,13 @@ void *handleLocalReq(void *threadarg) { memcpy(modptr+offset, headeraddr, size); offset += size; } + /* Write new objects into the mainobject store */ + for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) { + headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]); + size = sizeof(objheader_t) + classsize[TYPE(headeraddr)]; + memcpy(modptr+offset, headeraddr, size); + offset += size; + } ptr = modptr; offset = 0; //Reset @@ -813,7 +796,6 @@ void *handleLocalReq(void *threadarg) { v_nomatch++; /* Send TRANS_DISAGREE to Coordinator */ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - printf("DEBUG -> Sending TRANS_DISAGREE\n"); } } else {/* If Obj is not locked then lock object */ STATUS(((objheader_t *)mobj)) |= LOCK; @@ -829,7 +811,6 @@ void *handleLocalReq(void *threadarg) { v_nomatch++; /* Send TRANS_DISAGREE to Coordinator */ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - printf("DEBUG -> Sending TRANS_DISAGREE\n"); } } } @@ -838,12 +819,10 @@ void *handleLocalReq(void *threadarg) { /* Condition to send TRANS_AGREE */ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; - printf("DEBUG -> Sending TRANS_AGREE\n"); } /* Condition to send TRANS_SOFT_ABORT */ if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; - printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); //TODO currently the only soft abort case that is supported is when object locked by previous //transaction => v_matchlock > 0 //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported @@ -888,15 +867,13 @@ void *handleLocalReq(void *threadarg) { return NULL; } }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ - if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) { + if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); return NULL; } } /* Free memory */ - printf("DEBUG -> Freeing...\n"); - fflush(stdout); if (localtdata->transinfo->objlocked != NULL) { free(localtdata->transinfo->objlocked); @@ -917,12 +894,11 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int objheader_t *tmp_header; void *header; - printf("DEBUG -> Recv TRANS_ABORT\n"); /* Set all ref counts as 1 and do garbage collection */ ptr = (char *)modptr; for(i = 0; i< nummod; i++) { tmp_header = (objheader_t *)ptr; - tmp_header->rcount = 1; + tmp_header->rcount = 0; ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)]; } /* Unlock objects that was locked due to this transaction */ @@ -935,7 +911,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } /* Send ack to Coordinator */ - printf("DEBUG-> TRANS_SUCCESSFUL\n"); /*Free the pointer */ ptr = NULL; @@ -944,7 +919,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int /*This function completes the COMMIT process is the transaction is commiting */ -int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) { +int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) { objheader_t *header; int i = 0, offset = 0; char control; @@ -956,10 +931,10 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, return 1; } /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 1; //TODO Not sure what would be the val + header->rcount = 0; /* Change ptr address in mhash table */ - mhashRemove(oidmod[i]); + mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary mhashInsert(oidmod[i], (((char *)modptr) + offset)); offset += sizeof(objheader_t) + classsize[TYPE(header)]; @@ -968,6 +943,15 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, header->version += 1; } + for (i = 0; i < numcreated; i++) + { + header = (objheader_t *)(((char *)modptr) + offset); + mhashInsert(oidcreated[i], (((char *)modptr) + offset)); + offset += sizeof(objheader_t) + classsize[TYPE(header)]; + + lhashInsert(oidcreated[i], myIpAddr); + } + /* Unlock locked objects */ for(i = 0; i < numlocked; i++) { if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) { @@ -980,7 +964,6 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, //TODO Update location lookup table /* Send ack to Coordinator */ - printf("DEBUG-> TRANS_SUCESSFUL\n"); return 0; }