From e2b4c06112fc83db8a33696a500732b903b3fbde Mon Sep 17 00:00:00 2001 From: adash Date: Mon, 22 Sep 2008 22:10:02 +0000 Subject: [PATCH] get rid of unnecessary recv_data , SUCCESSFUL/UNSUCESSFUL ctrl msg add new debug info --- Robust/src/Runtime/DSTM/interface/dsmdebug.h | 22 ++++++ .../src/Runtime/DSTM/interface/dstmserver.c | 15 +--- Robust/src/Runtime/DSTM/interface/trans.c | 69 ++++--------------- 3 files changed, 39 insertions(+), 67 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dsmdebug.h b/Robust/src/Runtime/DSTM/interface/dsmdebug.h index c95695a4..0dd0e691 100644 --- a/Robust/src/Runtime/DSTM/interface/dsmdebug.h +++ b/Robust/src/Runtime/DSTM/interface/dsmdebug.h @@ -1,6 +1,8 @@ #ifndef _DSMDEBUG_H_ #define _DSMDEBUG_H_ +#include + #define TABORT1(s) {printf("%s\n", s); fflush(stdout);} #define TABORT2(s, msg) {printf("%s(): %s\n", s, msg); fflush(stdout);} #define TABORT3(func, s, msg, d) {printf("%s(): %s: for %s = %d\n", func, s, msg, d); fflush(stdout);} @@ -11,5 +13,25 @@ #define TABORT8(func, s, d) {printf("%s(): %s = %d\n", func, s, d); fflush(stdout);} #define TABORT9(func, a, b, c, d, val1, val2, val3) {printf("%s(): %s for %s =%x, %s = %d, %s = %x\n", func, a, b, val1, c, val2, d, val3); fflush(stdout);} +#define ARRAY_SIZE 10100 +#define GETSTARTDELAY(start, count) { \ + struct timeval tv; \ + count++; \ + gettimeofday(&tv, NULL); \ + start = tv.tv_sec+(tv.tv_usec/1000000.0); \ +} + +#define GETSTART(start) { \ + struct timeval tv; \ + gettimeofday(&tv, NULL); \ + start = tv.tv_sec+(tv.tv_usec/1000000.0); \ +} + +#define GETENDDELAY(start, end, time) { \ + struct timeval tv; \ + gettimeofday(&tv, NULL); \ + end = tv.tv_sec+(tv.tv_usec/1000000.0); \ + time = (end-start); \ +} #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index b5314303..1e0cac6d 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -24,6 +24,7 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a sockPoolHashTable_t *transPResponseSocketPool; + /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -353,9 +354,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__); return 1; } - recv_data((int)acceptfd, &control, sizeof(char)); - /* Process the new control message */ switch(control) { case TRANS_ABORT: @@ -378,10 +377,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, read_unlock(STATUSPTR(header)); } } - - /* Send ack to Coordinator */ - sendctrl = TRANS_UNSUCESSFUL; - send_data((int)acceptfd, &sendctrl, sizeof(char)); break; case TRANS_COMMIT: @@ -407,7 +402,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, //TODO Use fixed.trans_id TID since Client may have died break; } - /* Free memory */ if (transinfo->objlocked != NULL) { free(transinfo->objlocked); @@ -443,7 +437,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < fixed->numread + fixed->nummod; i++) { if (i < fixed->numread) { //Objs only read and not modified - int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array + int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array incr *= i; oid = *((unsigned int *)(objread + incr)); incr += sizeof(unsigned int); @@ -518,7 +512,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__); return 0; } - return control; } @@ -706,10 +699,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock } } //TODO Update location lookup table - - /* Send ack to coordinator */ - control = TRANS_SUCESSFUL; - send_data((int)acceptfd, &control, sizeof(char)); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 36fdcf1e..ee4c9295 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -19,6 +19,7 @@ #define PREFETCH_CACHE_SIZE 1048576 //1MB #define CONFIG_FILENAME "dstm.conf" + /* Global Variables */ extern int classsize[]; pfcstats_t *evalPrefetch; @@ -481,6 +482,7 @@ int transCommit(transrecord_t *record) { local_thread_data_array_t *ltdata; int firsttime=1; + do { treplyctrl=0; trecvcount = 0; @@ -496,7 +498,6 @@ int transCommit(transrecord_t *record) { else pile=pile_ptr; firsttime=0; - /* Create the packet to be sent in TRANS_REQUEST */ /* Count the number of participants */ @@ -592,9 +593,9 @@ int transCommit(transrecord_t *record) { threadnum++; pile = pile->next; } + /* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); - for (i = 0; i < threadnum; i++) { rc = pthread_join(thread[i], NULL); if(rc) { @@ -610,7 +611,6 @@ int transCommit(transrecord_t *record) { } free(thread_data_array[i].buffer); } - /* Free resources */ pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); @@ -675,7 +675,6 @@ void *transRequest(void *threadarg) { char machineip[16], retval; tdata = (thread_data_array_t *) threadarg; - if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) { printf("transRequest(): socket create error\n"); pthread_exit(NULL); @@ -697,6 +696,12 @@ void *transRequest(void *threadarg) { } /* Send objects that are modified */ + void *modptr; + if((modptr = calloc(1, tdata->buffer->f.sum_bytes)) == NULL) { + printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + int offset = 0; for(i = 0; i < tdata->buffer->f.nummod ; i++) { int size; if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) { @@ -705,11 +710,14 @@ void *transRequest(void *threadarg) { } GETSIZE(size,headeraddr); size+=sizeof(objheader_t); - send_data(sd, headeraddr, size); + memcpy(modptr+offset, headeraddr, size); + offset+=size; } - + send_data(sd, modptr, tdata->buffer->f.sum_bytes); + free(modptr); /* Read control message from Participant */ recv_data(sd, &control, sizeof(char)); + /* Recv Objects if participant sends TRANS_DISAGREE */ #ifdef CACHE if(control == TRANS_DISAGREE) { @@ -746,7 +754,6 @@ void *transRequest(void *threadarg) { } } #endif - recvcontrol = control; /* Update common data structure and increment count */ tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; @@ -766,29 +773,6 @@ void *transRequest(void *threadarg) { } pthread_mutex_unlock(tdata->lock); - /* clear objects from prefetch cache */ - /* - if(*(tdata->replyctrl) == TRANS_ABORT) { - int i; - for(i=0; ibuffer->f.nummod; i++) { - unsigned int oid = tdata->buffer->oidmod[i]; - objheader_t *header; - if((header = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - } - } - for(i=0; ibuffer->f.numread; i++) { - char *objread = tdata->buffer->objread; - unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) + - sizeof(unsigned short))*i)); - objheader_t *header; - if((header = prehashSearch(oid)) != NULL) { - prehashRemove(oid); - } - } - } - */ - #ifdef CACHE if(*(tdata->replyctrl) == TRANS_COMMIT) { int retval; @@ -814,16 +798,6 @@ void *transRequest(void *threadarg) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); pthread_exit(NULL); } - - recv_data((int)sd, &control, sizeof(char)); - - if(control == TRANS_UNSUCESSFUL) { - //printf("DEBUG-> TRANS_ABORTED\n"); - } else if(control == TRANS_SUCESSFUL) { - //printf("DEBUG-> TRANS_SUCCESSFUL\n"); - } else { - //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); - } pthread_exit(NULL); } @@ -833,7 +807,6 @@ void decideResponse(thread_data_array_t *tdata) { char control; int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ - for (i = 0 ; i < tdata->buffer->f.mcount; i++) { control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses written onto the shared array */ @@ -868,14 +841,6 @@ void decideResponse(thread_data_array_t *tdata) { /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; *(tdata->replyretry) = 0; -#ifdef CACHE -#if 0 - /* Turn prefetching on */ - int i; - for (i=0; ireplyctrl) = TRANS_ABORT; @@ -908,7 +873,6 @@ char sendResponse(thread_data_array_t *tdata, int sd) { /* If the decided response is TRANS_COMMIT */ retval = TRANS_COMMIT; } - return retval; } @@ -964,9 +928,7 @@ void *handleLocalReq(void *threadarg) { int numread, i; unsigned int oid; unsigned short version; - localtdata = (local_thread_data_array_t *) threadarg; - /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for @@ -1030,7 +992,6 @@ void *handleLocalReq(void *threadarg) { pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); } pthread_mutex_unlock(localtdata->tdata->lock); - if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) { if(transAbortProcess(localtdata) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); @@ -1054,6 +1015,7 @@ void *handleLocalReq(void *threadarg) { pthread_exit(NULL); } } + /* Free memory */ if (localtdata->transinfo->objlocked != NULL) { free(localtdata->transinfo->objlocked); @@ -1062,7 +1024,6 @@ void *handleLocalReq(void *threadarg) { free(localtdata->transinfo->objnotfound); } pthread_exit(NULL); - } /* Commit info for objects modified */ -- 2.34.1