From 6a34b2001ac8a0172f88a1788123166a07badd82 Mon Sep 17 00:00:00 2001 From: erubow Date: Wed, 19 Sep 2007 02:10:04 +0000 Subject: [PATCH] various bug fixes. --- Robust/src/Runtime/DSTM/interface/dstm.h | 2 +- .../src/Runtime/DSTM/interface/dstmserver.c | 33 ++++--- Robust/src/Runtime/DSTM/interface/mcpileq.c | 24 ----- Robust/src/Runtime/DSTM/interface/mcpileq.h | 1 - Robust/src/Runtime/DSTM/interface/queue.c | 1 - Robust/src/Runtime/DSTM/interface/trans.c | 97 +++++++++---------- 6 files changed, 71 insertions(+), 87 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index d4ea42b7..9ef13b5a 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -227,7 +227,7 @@ objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid int transCommit(transrecord_t *record); //return 0 if successful void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins void *handleLocalReq(void *); //the C routine that the local m/c thread will execute -int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant +void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); int transAbortProcess(void *, unsigned int *, int, int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 2d293577..2cea115c 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -94,7 +94,6 @@ void *dstmListen() acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); } - pthread_exit(NULL); } /* This function accepts a new connection request, decodes the control message in the connection * and accordingly calls other functions to process new requests */ @@ -109,15 +108,21 @@ void *dstmAccept(void *acceptfd) trans_commit_data_t transinfo; unsigned short objType; + transinfo.objlocked = NULL; + transinfo.objnotfound = NULL; + transinfo.modptr = NULL; + transinfo.numlocked = 0; + transinfo.numnotfound = 0; + int fd_flags = fcntl((int)acceptfd, F_GETFD), size; /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { if (retval == 0) { - return; // Testing connection + pthread_exit(NULL); // Testing connection } perror("Error in receiving control from coordinator\n"); - return; + pthread_exit(NULL); } switch(control) { @@ -125,7 +130,7 @@ void *dstmAccept(void *acceptfd) /* Read oid requested and search if available */ if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { perror("Error receiving object from cooridnator\n"); - return NULL; + pthread_exit(NULL); } if((srcObj = mhashSearch(oid)) == NULL) { printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__); @@ -138,7 +143,7 @@ void *dstmAccept(void *acceptfd) ctrl = OBJECT_NOT_FOUND; if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error sending control msg to coordinator\n"); - return NULL; + pthread_exit(NULL); } } else { /* Type */ @@ -146,11 +151,11 @@ void *dstmAccept(void *acceptfd) *((int *)&msg[1])=size; if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) { perror("Error sending size of object to coordinator\n"); - return NULL; + pthread_exit(NULL); } if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) { perror("Error in sending object\n"); - return NULL; + pthread_exit(NULL); } } break; @@ -172,14 +177,14 @@ void *dstmAccept(void *acceptfd) printf("DEBUG -> Recv TRANS_REQUEST\n"); if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { printf("Error in readClientReq\n"); - return; + pthread_exit(NULL); } break; case TRANS_PREFETCH: printf("DEBUG -> Recv TRANS_PREFETCH\n"); if((val = prefetchReq((int)acceptfd)) != 0) { printf("Error in readClientReq\n"); - return; + pthread_exit(NULL); } break; case START_REMOTE_THREAD: @@ -221,6 +226,8 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { objheader_t *headaddr; int sum = 0, i, N, n, val; + oidmod = NULL; + /* Read fixed_data_t data structure */ N = sizeof(fixed) - 1; ptr = (char *)&fixed;; @@ -274,6 +281,11 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) { /* Create an array of oids for modified objects */ oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int)); + if (oidmod == NULL) + { + printf("calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } ptr = (char *) modptr; for(i = 0 ; i < fixed.nummod; i++) { int tmpsize; @@ -416,7 +428,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne short version; char control = 0, *ptr; unsigned int oid; - unsigned int *oidnotfound, *oidlocked, *oidmod; + unsigned int *oidnotfound, *oidlocked; void *mobj; objheader_t *headptr; @@ -452,7 +464,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found and number of oids not found for later use */ - //oidnotfound[objnotfound] = OID(((objheader_t *)mobj)); oidnotfound[objnotfound] = oid; objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index 0555e8c4..38941ef5 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -27,12 +27,6 @@ void mcpileenqueue(prefetchpile_t *node) { prefetchpile_t *tmp, *prev; if(mcqueue.front == NULL && mcqueue.rear == NULL) { mcqueue.front = mcqueue.rear = node; - /*tmp = mcqueue.front = node; - while(tmp != NULL) { - prev = tmp; - tmp = tmp->next; - } - mcqueue.rear = prev;*/ } else { tmp = mcqueue.rear->next = node; while(tmp != NULL) { @@ -59,24 +53,6 @@ prefetchpile_t *mcpiledequeue(void) { return retnode; } -/* Delete the node pointed to by the front ptr of the queue */ -void delnode() { - prefetchpile_t *delnode; - if((mcqueue.front == NULL) && (mcqueue.rear == NULL)) { - printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); - return; - } else if ((mcqueue.front == mcqueue.rear) && mcqueue.front != NULL && mcqueue.rear != NULL) { - printf("TEST1\n"); - free(mcqueue.front); - mcqueue.front = mcqueue.rear = NULL; - } else { - delnode = mcqueue.front; - mcqueue.front = mcqueue.front->next; - printf("TEST2\n"); - free(delnode); - } -} - void mcpiledelete(void) { /* Remove each element */ while(mcqueue.front != NULL) diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index 93e20630..8291c591 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -30,7 +30,6 @@ typedef struct mcpileq { void mcpileqInit(void); void mcpileenqueue(prefetchpile_t *); prefetchpile_t *mcpiledequeue(void); -void delnode(); void mcpiledelete(); void mcpiledisplay(); void mcdealloc(prefetchpile_t *); diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 373de749..7a8fbba3 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -74,7 +74,6 @@ void queueDisplay() { void predealloc(prefetchqelem_t *node) { free(node); - node->next = NULL; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 2b0b5fbc..c50a10b0 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -351,8 +351,8 @@ plistnode_t *createPiles(transrecord_t *record) { int transCommit(transrecord_t *record) { unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; - int i, rc, val; - int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0; + int i, j, rc, val; + int pilecount, offset, threadnum, trecvcount; char buffer[RECEIVE_BUFFER_SIZE],control; char transid[TID_LEN]; trans_req_data_t *tosend; @@ -362,6 +362,8 @@ int transCommit(transrecord_t *record) { char localstat = 0; do { + trecvcount = 0; + threadnum = 0; /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ @@ -375,6 +377,7 @@ int transCommit(transrecord_t *record) { /* Create a list of machine ids(Participants) involved in transaction */ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + free(record); return 1; } pListMid(pile, listmid); @@ -395,6 +398,7 @@ int transCommit(transrecord_t *record) { pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); + free(record); return 1; } @@ -406,6 +410,7 @@ int transCommit(transrecord_t *record) { pDelete(pile_ptr); free(listmid); free(thread_data_array); + free(record); return 1; } @@ -429,6 +434,7 @@ int transCommit(transrecord_t *record) { free(listmid); free(thread_data_array); free(ltdata); + free(record); return 1; } tosend->f.control = TRANS_REQUEST; @@ -454,31 +460,35 @@ int transCommit(transrecord_t *record) { thread_data_array[threadnum].rec = record; /* If local do not create any extra connection */ if(pile->mid != myIpAddr) { /* Not local */ - rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); + rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); if(rc) { perror("Error in pthread create\n"); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); - free(tosend); + free(record); return 1; } } else { /*Local*/ ltdata->tdata = &thread_data_array[threadnum]; ltdata->transinfo = &transinfo; - val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata); + val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); if(val) { perror("Error in pthread create\n"); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); - free(tosend); + free(record); return 1; } } @@ -499,9 +509,11 @@ int transCommit(transrecord_t *record) { pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); + for (j = i; j < pilecount; j++) + free(thread_data_array[j].buffer); free(thread_data_array); free(ltdata); - free(tosend); + free(record); return 1; } free(thread_data_array[i].buffer); @@ -511,13 +523,10 @@ int transCommit(transrecord_t *record) { /* Free resources */ pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); - if(listmid != NULL) - free(listmid); + free(listmid); pDelete(pile_ptr); - if(thread_data_array != NULL) - free(thread_data_array); - if(ltdata != NULL) - free(ltdata); + free(thread_data_array); + free(ltdata); /* wait a random amount of time */ if (treplyretry == 1) @@ -525,7 +534,8 @@ int transCommit(transrecord_t *record) { /* Retry trans commit procedure if not sucessful in the first try */ } while (treplyretry == 1); - + + free(record); return 0; } @@ -548,7 +558,6 @@ void *transRequest(void *threadarg) { if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST\n"); pthread_exit(NULL); - return NULL; } bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; @@ -559,24 +568,24 @@ void *transRequest(void *threadarg) { /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); + close(sd); pthread_exit(NULL); - return NULL; } printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); /* Send bytes of data with TRANS_REQUEST control message */ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { perror("Error sending fixed bytes for thread\n"); + close(sd); pthread_exit(NULL); - return NULL; } /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*tdata->buffer->f.mcount; if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { perror("Error sending list of machines for thread\n"); + close(sd); pthread_exit(NULL); - return NULL; } } /* Send oids and version number tuples for objects that are read */ @@ -584,8 +593,8 @@ void *transRequest(void *threadarg) { int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { perror("Error sending tuples for thread\n"); + close(sd); pthread_exit(NULL); - return NULL; } } /* Send objects that are modified */ @@ -596,16 +605,16 @@ void *transRequest(void *threadarg) { size+=sizeof(objheader_t); if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { perror("Error sending obj modified for thread\n"); + close(sd); pthread_exit(NULL); - return NULL; } } /* Read control message from Participant */ if((n = read(sd, &control, sizeof(char))) <= 0) { perror("Error in reading control message from Participant\n"); + close(sd); pthread_exit(NULL); - return NULL; } recvcontrol = control; @@ -620,12 +629,7 @@ void *transRequest(void *threadarg) { /* Wake up the threads and invoke decideResponse (once) */ if(*(tdata->count) == tdata->buffer->f.mcount) { - if (decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - pthread_exit(NULL); - return NULL; - } + decideResponse(tdata); pthread_cond_broadcast(tdata->threshold); } else { pthread_cond_wait(tdata->threshold, tdata->lock); @@ -636,9 +640,8 @@ void *transRequest(void *threadarg) { * to all participants in their respective socket */ if (sendResponse(tdata, sd) == 0) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); + close(sd); pthread_exit(NULL); - return NULL; } /* Close connection */ @@ -648,7 +651,7 @@ void *transRequest(void *threadarg) { /* This function decides the reponse that needs to be sent to * all Participant machines after the TRANS_REQUEST protocol */ -int decideResponse(thread_data_array_t *tdata) { +void decideResponse(thread_data_array_t *tdata) { char control; int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ @@ -657,6 +660,9 @@ int decideResponse(thread_data_array_t *tdata) { control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses written onto the shared array */ switch(control) { + default: + printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + /* treat as disagree, pass thru */ case TRANS_DISAGREE: transdisagree++; break; @@ -668,9 +674,6 @@ int decideResponse(thread_data_array_t *tdata) { case TRANS_SOFT_ABORT: transsoftabort++; break; - default: - printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); - return -1; } } @@ -680,23 +683,19 @@ int decideResponse(thread_data_array_t *tdata) { /* Free resources */ objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); - free(tdata->rec); } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; /* Free resources */ objstrDelete(tdata->rec->cache); chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transsoftabort > 0 && transdisagree == 0) { + } else { /* (transsoftabort > 0 && transdisagree == 0) */ /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 1; - } else { - return -1; } - return 0; + return; } /* This function sends the final response to remote machines per thread in their respective socket id */ char sendResponse(thread_data_array_t *tdata, int sd) { @@ -837,7 +836,7 @@ void *handleLocalReq(void *threadarg) { if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) { printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&mainobjstore_mutex); - return NULL; + pthread_exit(NULL); } pthread_mutex_unlock(&mainobjstore_mutex); /* Write modified objects into the mainobject store */ @@ -946,11 +945,7 @@ void *handleLocalReq(void *threadarg) { /* Wake up the threads and invoke decideResponse (once) */ if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { - if (decideResponse(localtdata->tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(localtdata->tdata->lock); - return NULL; - } + decideResponse(localtdata->tdata); pthread_cond_broadcast(localtdata->tdata->threshold); } else { pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); @@ -961,12 +956,12 @@ void *handleLocalReq(void *threadarg) { if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); - return NULL; + pthread_exit(NULL); } }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); - return NULL; + pthread_exit(NULL); } } @@ -1328,7 +1323,7 @@ void *transPrefetch(void *t) { /* dequeue node to create a machine piles and finally unlock mutex */ if((qnode = pre_dequeue()) == NULL) { printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); - return NULL; + pthread_exit(NULL); } pthread_mutex_unlock(&pqueue.qlock); /* Reduce redundant prefetch requests */ @@ -1371,7 +1366,7 @@ void *mcqProcess(void *threadid) { /* Dequeue node to send remote machine connections*/ if((mcpilenode = mcpiledequeue()) == NULL) { printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); - return NULL; + pthread_exit(NULL); } /* Unlock mutex */ pthread_mutex_unlock(&mcqueue.qlock); @@ -1409,6 +1404,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); + close(sd); return; } @@ -1416,6 +1412,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { control = TRANS_PREFETCH; if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error in sending prefetch control\n"); + close(sd); return; } @@ -1436,6 +1433,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { } if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) { perror("Error sending fixed bytes for thread\n"); + close(sd); return; } tmp = tmp->next; @@ -1445,6 +1443,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { endpair = -1; if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) { perror("Error sending fixed bytes for thread\n"); + close(sd); return; } -- 2.34.1