From: adash Date: Thu, 6 Mar 2008 23:45:33 +0000 (+0000) Subject: changed the prefetch request send and prefetch response receive design X-Git-Tag: preEdgeChange~237 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=bcb6a97d6bcf5151466a7bab1e51d9bd6f5111d6;p=IRC.git changed the prefetch request send and prefetch response receive design delete the second retry in the prefetch cache added do while loop for trans_soft_abort case in transCommit() function minor bug fixes --- diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index adfaaf17..0f61a9dd 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -1530,10 +1530,6 @@ public class BuildCode { tstlbl += generateTemp(fm, id.getTempDescAt(i), lb) + "+"; } tstlbl += id.offset.toString(); - output.println("if ("+tstlbl+"< 0 || "+tstlbl+" >= "+ - generateTemp(fm, pp.base, lb) + "->___length___) {"); - output.println(" failedboundschk();"); - output.println("}"); TypeDescriptor elementtype = pp.base.getType().dereference(); String type=""; @@ -1542,13 +1538,9 @@ public class BuildCode { else type=elementtype.getSafeSymbol()+" "; - String oid = new String("(unsigned int) (" + generateTemp(fm, pp.base, lb) + " != NULL ? " + "((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"] : 0)"); - - /* - test = "(("+tstlbl+"< 0) || ("+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___))"; - String oid = new String("(unsigned int) (" +genarateTemp(fm, pp.base, lb) + " != NULL ? (" +test+ " ? 0 : ((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"]) : 0);"); - */ - + String oid = new String("(unsigned int) (" + generateTemp(fm, pp.base, lb) + " != NULL ? " + + "((" + tstlbl+"< 0 || "+tstlbl+" >= "+ generateTemp(fm, pp.base, lb) + "->___length___) ? 0 :"+ + "((" + type + "*)(((char *) &("+ generateTemp(fm, pp.base, lb)+ "->___length___))+sizeof(int)))["+tstlbl+"]) : 0)"); oids.add(oid); } diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index c250439f..31deba12 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -244,7 +244,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *, int *); void checkPreCache(prefetchqelem_t *, int *, unsigned int, int); int transPrefetchProcess(transrecord_t *, int **, short); void sendPrefetchReq(prefetchpile_t*); -void getPrefetchResponse(int, int); +int getPrefetchResponse(int); unsigned short getObjType(unsigned int oid); int startRemoteThread(unsigned int oid, unsigned int mid); /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */ diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index b232cbef..1988c502 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -21,7 +21,6 @@ #define LISTEN_PORT 2156 #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 -#define PRE_BUF_SIZE 2048 extern int classsize[]; @@ -114,9 +113,10 @@ void *dstmListen() * and accordingly calls other functions to process new requests */ void *dstmAccept(void *acceptfd) { - int val, retval, size; + int val, retval, size, sum; unsigned int oid; - char buffer[RECEIVE_BUFFER_SIZE], control,ctrl; + char *buffer; + char control,ctrl; char *ptr; void *srcObj; objheader_t *h; @@ -124,8 +124,6 @@ void *dstmAccept(void *acceptfd) unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; - int i; - transinfo.objlocked = NULL; transinfo.objnotfound = NULL; transinfo.modptr = NULL; @@ -134,7 +132,7 @@ void *dstmAccept(void *acceptfd) /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { - perror("Error: in receiving control from coordinator\n"); + printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__); pthread_exit(NULL); } @@ -196,6 +194,12 @@ void *dstmAccept(void *acceptfd) pthread_exit(NULL); } break; + case TRANS_PREFETCH_RESPONSE: + if((val = getPrefetchResponse((int) acceptfd)) != 0) { + printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + break; case START_REMOTE_THREAD: retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0); if (retval <= 0) @@ -211,53 +215,52 @@ void *dstmAccept(void *acceptfd) break; case THREAD_NOTIFY_REQUEST: - size = sizeof(unsigned int); - bzero(&buffer, RECEIVE_BUFFER_SIZE); - retval = recv((int)acceptfd, &buffer, size, 0); - numoid = *((unsigned int *) &buffer); + retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); - bzero(&buffer, RECEIVE_BUFFER_SIZE); - retval = recv((int)acceptfd, &buffer, size, 0); - if(retval <=0) - perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST"); - else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid)) - printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval, - __FILE__, __LINE__); - else { - oidarry = calloc(numoid, sizeof(unsigned int)); - memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); - size = sizeof(unsigned int) * numoid; - versionarry = calloc(numoid, sizeof(unsigned short)); - memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); - size += sizeof(unsigned short) * numoid; - mid = *((unsigned int *)(buffer+size)); - size += sizeof(unsigned int); - threadid = *((unsigned int *)(buffer+size)); - processReqNotify(numoid, oidarry, versionarry, mid, threadid); + if((buffer = calloc(1,size)) == NULL) { + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } + sum = 0; + do { + sum += recv((int)acceptfd, buffer+sum, size-sum, 0); + } while(sum < size); + + oidarry = calloc(numoid, sizeof(unsigned int)); + memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); + size = sizeof(unsigned int) * numoid; + versionarry = calloc(numoid, sizeof(unsigned short)); + memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); + size += sizeof(unsigned short) * numoid; + mid = *((unsigned int *)(buffer+size)); + size += sizeof(unsigned int); + threadid = *((unsigned int *)(buffer+size)); + processReqNotify(numoid, oidarry, versionarry, mid, threadid); + free(buffer); break; case THREAD_NOTIFY_RESPONSE: size = sizeof(unsigned short) + 2 * sizeof(unsigned int); - bzero(&buffer, RECEIVE_BUFFER_SIZE); - retval = recv((int)acceptfd, &buffer, size, 0); - if(retval <= 0) - perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE"); - else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short)) - printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n", - retval, __FILE__, __LINE__); - else { - oid = *((unsigned int *)buffer); - size = sizeof(unsigned int); - version = *((unsigned short *)(buffer+size)); - size += sizeof(unsigned short); - threadid = *((unsigned int *)(buffer+size)); - threadNotify(oid,version,threadid); + if((buffer = calloc(1,size)) == NULL) { + printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__); + pthread_exit(NULL); } - break; + sum = 0; + do { + sum += recv((int)acceptfd, buffer+sum, size-sum, 0); + } while(sum < size); + + oid = *((unsigned int *)buffer); + size = sizeof(unsigned int); + version = *((unsigned short *)(buffer+size)); + size += sizeof(unsigned short); + threadid = *((unsigned int *)(buffer+size)); + threadNotify(oid,version,threadid); + free(buffer); + break; default: printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__); } @@ -669,122 +672,194 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * then use offset values to prefetch references to other objects */ int prefetchReq(int acceptfd) { - int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0; - int isArray = 0; - unsigned int oid, index = 0; - char *ptr, buffer[PRE_BUF_SIZE]; - void *mobj; - unsigned int objoid; - char control; - objheader_t * header; - int bytesRecvd; + int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0; + int length, sd; + char *recvbuffer, *sendbuffer, control; + unsigned int oid, mid; + unsigned short *offsetarry; + objheader_t *header; + struct sockaddr_in remoteAddr; - /* Repeatedly recv one oid and offset pair sent for prefetch */ - while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { - count++; - if(length == -1) + while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) { + if(length == -1) { //-1 is special character to represent end of sending oids and offsets break; - index = 0; - bytesRecvd = 0; - do { - bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd, - sizeof(unsigned int) - bytesRecvd, 0); - } while (bytesRecvd < sizeof(unsigned int)); - numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short); - N = numoffset * sizeof(short); - short offset[numoffset]; - ptr = (char *)&offset; - sum = 0; - /* Recv the offset values per oid */ - do { - n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); - sum += n; - } while(sum < N && n != 0); + } else { + numbytes = 0; + size = length - sizeof(int); + if((recvbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __FILE__, __LINE__); + return -1; + } + while(numbytes < size) { + numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0); + } - bzero(&buffer, PRE_BUF_SIZE); - /* Process each oid */ - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found in buffer for later use */ - *(buffer + index) = OBJECT_NOT_FOUND; - index += sizeof(char); - *((unsigned int *)(buffer+index)) = oid; - index += sizeof(unsigned int); - } else { /* If Obj found in machine (i.e. has not moved) */ - /* send the oid, it's size, it's header and data */ - header = (objheader_t *)mobj; - GETSIZE(size, header); - size += sizeof(objheader_t); - *(buffer + index) = OBJECT_FOUND; - index += sizeof(char); - *((unsigned int *)(buffer+index)) = oid; - index += sizeof(unsigned int); - *((int *)(buffer+index)) = size; - index += sizeof(int); - memcpy(buffer + index, header, size); - index += size; - /* Calculate the oid corresponding to the offset value */ - for(i = 0 ; i< numoffset ; i++) { - /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { - isArray = 1; + oid = *((unsigned int *) recvbuffer); + mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int))); + size = size - (2 * sizeof(unsigned int)); + numoffset = size / sizeof(short); + if((offsetarry = calloc(numoffset, sizeof(unsigned short))) == NULL) { + printf("Calloc error at %s,%d\n", __FILE__, __LINE__); + free(recvbuffer); + return -1; + } + memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size); + free(recvbuffer); + + /* Create socket to send information */ + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("prefetchReq():socket()"); + return; + } + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + close(sd); + return -1; + } + + /*Process each oid */ + if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found in buffer for later use */ + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + if((sendbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __FILE__, __LINE__); + free(offsetarry); + close(sd); + return -1; } - if(isArray == 1) { - int elementsize = classsize[TYPE(header)]; - objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i]))); - } else { - objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i])); + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + free(offsetarry); + printf("Error: %s() in sending prefetch response at %s, %d\n", + __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + } else { /* Object Found */ + int incr = 0; + GETSIZE(objsize, header); + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + if((sendbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __FILE__, __LINE__); + free(offsetarry); + close(sd); + return -1; } - if((header = mhashSearch(objoid)) == NULL) { - /* Obj not found, send oid */ - *(buffer + index) = OBJECT_NOT_FOUND; - index += sizeof(char); - *((unsigned int *)(buffer+index)) = objoid; - index += sizeof(unsigned int); - break; - } else {/* Obj Found */ - /* send the oid, it's size, it's header and data */ - GETSIZE(size, header); - size+=sizeof(objheader_t); - *(buffer+index) = OBJECT_FOUND; - index += sizeof(char); - *((unsigned int *)(buffer+index)) = objoid; - index += sizeof(unsigned int); - *((int *)(buffer+index)) = size; - index += sizeof(int); - memcpy(buffer+index, header, size); - index += size; + *((int *) (sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer+incr)) = oid; + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + free(offsetarry); + printf("Error: %s() in sending prefetch response at %s, %d\n", + __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + /* Calculate the oid corresponding to the offset value */ + for(i = 0 ; i< numoffset ; i++) { + /* Check for arrays */ + if(TYPE(header) > NUMCLASSES) { + isArray = 1; + } + if(isArray == 1) { + int elementsize = classsize[TYPE(header)]; + oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i]))); + } else { + oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i])); + } + + if((header = mhashSearch(oid)) == NULL) { + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + if((sendbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __FILE__, __LINE__); + free(offsetarry); + close(sd); + return -1; + } + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + free(offsetarry); + printf("Error: %s() in sending prefetch response at %s, %d\n", + __FILE__, __LINE__); + close(sd); + return -1; + } + break; + } else {/* Obj Found */ + int incr = 0; + GETSIZE(objsize, header); + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + if((sendbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); + free(offsetarry); + close(sd); + return -1; + } + *((int *) (sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer+incr)) = oid; + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + free(offsetarry); + printf("Error: %s() in sending prefetch response at %s, %d\n", + __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + } isArray = 0; - continue; } + free(offsetarry); } } + } + close(sd); + return 0; +} - /* Check for overflow in the buffer */ - if (index >= PRE_BUF_SIZE) { - printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__); - return 1; - } - /* Send Prefetch response control message only once*/ - if(count == 1){ - control = TRANS_PREFETCH_RESPONSE; - if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error: in sending PREFETCH RESPONSE to Coordinator\n"); - return 1; - } - } +int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { + int numbytes = 0; - //Send buffer size - if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) { - perror("Error: in sending PREFETCH RESPONSE to Coordinator\n"); - return 1; - } + if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { + printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__); + free(sendbuffer); + return -1; + } - /* Send the entire buffer with its size and oids found and not found */ - if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) { - perror("Error: sending oids found\n"); - return 1; - } + /* Send the buffer with its size */ + if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) { + printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n", + __func__, __FILE__, __LINE__, numbytes, *(size)); + free(sendbuffer); + return -1; } + + free(sendbuffer); return 0; } @@ -796,7 +871,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short int sd; struct sockaddr_in remoteAddr; int bytesSent; - int status, size; + int size; int i = 0; while(i < numoid) { @@ -832,7 +907,8 @@ checkversion: if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - status = -1; + close(sd); + return; } else { //Send Update notification msg[0] = THREAD_NOTIFY_RESPONSE; @@ -844,13 +920,16 @@ checkversion: bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0); if (bytesSent < 0){ perror("processReqNotify():send()"); - status = -1; + close(sd); + return; } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){ printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__); - status = -1; + close(sd); + return; } else { - status = 0; + close(sd); + return; } } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index eb3b7647..eec4a90b 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -23,7 +23,6 @@ #endif #define LISTEN_PORT 2156 -#define RECEIVE_BUFFER_SIZE 2048 #define NUM_THREADS 10 #define PREFETCH_CACHE_SIZE 1048576 //1MB #define CONFIG_FILENAME "dstm.conf" @@ -90,7 +89,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short int qnodesize; int len = 0; int i, rc; - + /* Allocate for the queue node*/ char *node; if(ntuples > 0) { @@ -259,6 +258,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { struct timeval tp; if(oid == 0) { + printf("Error: %s, %d oid is NULL \n", __FILE__, __LINE__); return NULL; } @@ -277,6 +277,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ + #ifdef COMPILER return &objheader[1]; #else @@ -309,10 +310,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { #endif } else { /*If object not found in prefetch cache then block until object appears in the prefetch cache */ + /* pthread_mutex_lock(&pflookup.lock); while(!found) { rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); - /* Check Prefetch cache again */ + // Check Prefetch cache again if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) { found = 1; GETSIZE(size,tmp); @@ -331,6 +333,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { break; } } + */ /* Get the object from the remote location */ if((machinenumber = lhashSearch(oid)) == 0) { @@ -343,6 +346,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); return NULL; } else { + #ifdef COMPILER return &objcopy[1]; #else @@ -429,172 +433,186 @@ int transCommit(transrecord_t *record) { plistnode_t *pile, *pile_ptr; int i, j, rc, val; int pilecount, offset, threadnum = 0, trecvcount = 0; - char buffer[RECEIVE_BUFFER_SIZE],control; + char control; char transid[TID_LEN]; trans_req_data_t *tosend; trans_commit_data_t transinfo; static int newtid = 0; char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ char localstat = 0; + thread_data_array_t *thread_data_array; + local_thread_data_array_t *ltdata; + do { + trecvcount = 0; + threadnum = 0; + treplyretry = 0; + thread_data_array = NULL; + ltdata = NULL; + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + pile_ptr = pile = createPiles(record); - /* Look through all the objects in the transaction record and make piles - * for each machine involved in the transaction*/ - pile_ptr = pile = createPiles(record); - - /* Create the packet to be sent in TRANS_REQUEST */ - - /* Count the number of participants */ - pilecount = pCount(pile); - - /* Create a list of machine ids(Participants) involved in transaction */ - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - pListMid(pile, listmid); - + /* Create the packet to be sent in TRANS_REQUEST */ - /* Initialize thread variables, - * Spawn a thread for each Participant involved in a transaction */ - pthread_t thread[pilecount]; - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; + /* Count the number of participants */ + pilecount = pCount(pile); - thread_data_array_t *thread_data_array; - if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - return 1; - } + /* Create a list of machine ids(Participants) involved in transaction */ + if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + pListMid(pile, listmid); - local_thread_data_array_t *ltdata; - if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - free(thread_data_array); - return 1; - } - thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; - /* Initialize and set thread detach attribute */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); + if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + return 1; + } - /* Process each machine pile */ - while(pile != NULL) { - //Create transaction id - newtid++; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { + if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); free(thread_data_array); - free(ltdata); return 1; } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - tosend->f.nummod = pile->nummod; - tosend->f.numcreated = pile->numcreated; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - tosend->oidcreated = pile->oidcreated; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->mid != myIpAddr) { /* Not local */ - do { - rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); - } while(rc!=0); - if(rc) { - perror("Error in pthread create\n"); + + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ + + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); free(thread_data_array); free(ltdata); return 1; } - } else { /*Local*/ - ltdata->tdata = &thread_data_array[threadnum]; - ltdata->transinfo = &transinfo; - do { - val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); - } while(val!=0); - if(val) { - perror("Error in pthread create\n"); + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.numcreated = pile->numcreated; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + tosend->oidcreated = pile->oidcreated; + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->mid != myIpAddr) { /* Not local */ + do { + rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); + } while(rc!=0); + if(rc) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; + } + } else { /*Local*/ + ltdata->tdata = &thread_data_array[threadnum]; + ltdata->transinfo = &transinfo; + do { + val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); + } while(val!=0); + if(val) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; + } + } + + threadnum++; + pile = pile->next; + } + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + + for (i = 0; i < threadnum; i++) { + rc = pthread_join(thread[i], NULL); + if(rc) + { + printf("Error: return code from pthread_join() is %d\n", rc); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); pDelete(pile_ptr); free(listmid); - for (i = 0; i < threadnum; i++) - free(thread_data_array[i].buffer); - free(thread_data_array); - free(ltdata); + for (j = i; j < threadnum; j++) { + free(thread_data_array[j].buffer); + } return 1; } + free(thread_data_array[i].buffer); } - threadnum++; - pile = pile->next; - } - - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(listmid); + pDelete(pile_ptr); - for (i = 0; i < threadnum; i++) { - rc = pthread_join(thread[i], NULL); - if(rc) - { - printf("Error: return code from pthread_join() is %d\n", rc); - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - pDelete(pile_ptr); - free(listmid); - for (j = i; j < threadnum; j++) { - free(thread_data_array[j].buffer); - } - return 1; + /* wait a random amount of time before retrying to commit transaction*/ + if(treplyretry == 1) { + free(thread_data_array); + free(ltdata); + randomdelay(); } - free(thread_data_array[i].buffer); - } - /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(listmid); - pDelete(pile_ptr); - + /* Retry trans commit procedure during soft_abort case */ + } while (treplyretry == 1); + if(treplyctrl == TRANS_ABORT) { /* Free Resources */ @@ -614,7 +632,7 @@ int transCommit(transrecord_t *record) { return 0; } else { //TODO Add other cases - printf("DEBUG-> THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n"); + printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); exit(-1); } @@ -630,7 +648,7 @@ void *transRequest(void *threadarg) { struct sockaddr_in serv_addr; thread_data_array_t *tdata; objheader_t *headeraddr; - char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; + char control, recvcontrol; char machineip[16], retval; tdata = (thread_data_array_t *) threadarg; @@ -894,10 +912,16 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return NULL; } objcopy = objstrAlloc(record->cache, size); + int sum = 0; + while (sum < size) { + sum += read(sd, (char *)objcopy+sum, size-sum); + } + /* if((val = read(sd, (char *)objcopy, size)) <= 0) { perror("No objects are read from the remote participant\n"); return NULL; } + */ /* Insert into cache's lookup table */ chashInsert(record->lookupTable, oid, objcopy); break; @@ -1246,8 +1270,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { prefetchpile_t *foundLocal(prefetchqelem_t *node) { int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val; unsigned int *oid; - unsigned int objoid; - int isArray = 0; + int isArray; char *ptr, *tmp; objheader_t *objheader; short *endoffsets, *arryfields; @@ -1265,6 +1288,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { for(i = 1; i NUMCLASSES) { isArray = 1; @@ -1325,7 +1352,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { flag = 1; checkPreCache(node, numoffset, oid[i], i); break; - } + } tmp = (char *) objheader; isArray = 0; } @@ -1545,13 +1572,15 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) { while(tmp != NULL) { off = 0; count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ - len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short)); + len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short)); char oidnoffset[len]; bzero(oidnoffset, len); *((unsigned int*)oidnoffset) = len; off = sizeof(int); *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid; off += sizeof(unsigned int); + *((unsigned int *)((char *)oidnoffset + off)) = myIpAddr; //Recently added as of 03/03/2008 at 6:00pm + off += sizeof(unsigned int); for(i = 0; i < tmp->numoffset; i++) { *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i]; off+=sizeof(unsigned short); @@ -1574,105 +1603,78 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode) { return; } - /* Get Response from the remote machine */ - getPrefetchResponse(count,sd); close(sd); return; } -void getPrefetchResponse(int count, int sd) { - int i = 0, val, n, N, sum, index, objsize; - unsigned int bufsize,oid; - char *buffer; - char control; - char *ptr; +int getPrefetchResponse(int sd) { + int numbytes = 0, length = 0, size = 0; + char *recvbuffer, control; + unsigned int oid; void *modptr, *oldptr; - /* Read prefetch response from the Remote machine */ - if((val = read(sd, &control, sizeof(char))) <= 0) { - perror("No control response for Prefetch request sent\n"); - return; - } + if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) { + printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } else { + numbytes = 0; + size = length - sizeof(int); + if((recvbuffer = calloc(1, size)) == NULL) { + printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); + return -1; + } + while(numbytes < size) { + numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0); + } - if(control == TRANS_PREFETCH_RESPONSE) { - /*For each oid and offset tuple sent as prefetch request to remote machine*/ - while(N = recv((int)sd, &bufsize, sizeof(unsigned int), 0) != 0) { - if((buffer = calloc(1, bufsize)) == NULL) { - printf("Calloc Error in %s() at %s, %d\n", __func__, __FILE__, __LINE__); - return; + control = *((char *) recvbuffer); + if(control == OBJECT_FOUND) { + numbytes = 0; + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + size = size - (sizeof(char) + sizeof(unsigned int)); + pthread_mutex_lock(&prefetchcache_mutex); + if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { + printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + free(recvbuffer); + return -1; } - sum = 0; - index = 0; - ptr = buffer; - /* Keep receiving the buffer containing oid info */ - do { - n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0); - sum +=n; - } while(sum < bufsize && n != 0); - - /* Decode the contents of the buffer */ - while(index < bufsize ) { - if(buffer[index] == OBJECT_FOUND) { - /* Increment it to get the object */ - index += sizeof(char); - oid = *((unsigned int *)(buffer+index)); - index += sizeof(unsigned int); - /* For each object found add to Prefetch Cache */ - objsize = *((int *)(buffer+index)); - index += sizeof(int); - pthread_mutex_lock(&prefetchcache_mutex); - if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { - printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&prefetchcache_mutex); - free(buffer); - return; - } - pthread_mutex_unlock(&prefetchcache_mutex); - memcpy(modptr, buffer+index, objsize); - index += objsize; - /* Insert the oid and its address into the prefetch hash lookup table */ - /* Do a version comparison if the oid exists */ - if((oldptr = prehashSearch(oid)) != NULL) { - /* If older version then update with new object ptr */ - if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) { - prehashRemove(oid); - prehashInsert(oid, modptr); - } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { - /* Add the new object ptr to hash table */ - prehashRemove(oid); - prehashInsert(oid, modptr); - } else { /* Do nothing: TODO modptr should be reference counted */ - ; - } - } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/ - prehashInsert(oid, modptr); - } - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); - /* Broadcast signal on prefetch cache condition variable */ - pthread_cond_broadcast(&pflookup.cond); - /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); - } else if(buffer[index] == OBJECT_NOT_FOUND) { - /* Increment it to get the object */ - /* TODO: For each object not found query DHT for new location and retrieve the object */ - index += sizeof(char); - oid = *((unsigned int *)(buffer + index)); - index += sizeof(unsigned int); - /* Throw an error */ - printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); - exit(-1); + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); + + /* Insert the oid and its address into the prefetch hash lookup table */ + /* Do a version comparison if the oid exists */ + if((oldptr = prehashSearch(oid)) != NULL) { + /* If older version then update with new object ptr */ + if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { + prehashRemove(oid); + prehashInsert(oid, modptr); } else { - printf("Error in decoding the index value %d, %s, %d\n",index, __FILE__, __LINE__); - free(buffer); - return; + /* TODO modptr should be reference counted */ } + } else {/* Else add the object ptr to hash table*/ + prehashInsert(oid, modptr); } - free(buffer); + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); + /* Broadcast signal on prefetch cache condition variable */ + pthread_cond_broadcast(&pflookup.cond); + /* Unlock the Prefetch Cache look up table*/ + pthread_mutex_unlock(&pflookup.lock); + } else if(control == OBJECT_NOT_FOUND) { + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + /* TODO: For each object not found query DHT for new location and retrieve the object */ + /* Throw an error */ + printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); + free(recvbuffer); + exit(-1); + } else { + printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); } - } else - printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__); - return; + free(recvbuffer); + } + + return 0; } unsigned short getObjType(unsigned int oid)