From: adash Date: Wed, 20 Feb 2008 04:54:19 +0000 (+0000) Subject: some bug fixes for sending and receiving objects (that form the prefetch tuples) X-Git-Tag: preEdgeChange~263 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=fcf239ec98d94cd2a6822666e77007549d0d8d61;p=IRC.git some bug fixes for sending and receiving objects (that form the prefetch tuples) while serving the prefetch request across machines. --- diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 4434ee00..a91134af 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -651,23 +651,13 @@ int prefetchReq(int acceptfd) { char control; objheader_t * header; int bytesRecvd; -/* - unsigned int myIpAddr; -#ifdef MAC - myIpAddr = getMyIpAddr("en1"); -#else - myIpAddr = getMyIpAddr("eth0"); -#endif -*/ - /* Repeatedly recv the oid and offset pairs sent for prefetch */ + /* Repeatedly recv one oid and offset pair sent for prefetch */ while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { count++; if(length == -1) break; - index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the - // first 4 bytes are saved to send the - // size of the buffer (that is computed at the end of the loop) + index = 0; bytesRecvd = 0; do { bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd, @@ -684,6 +674,7 @@ int prefetchReq(int acceptfd) { sum += n; } while(sum < N && n != 0); + bzero(&buffer, PRE_BUF_SIZE); /* Process each oid */ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found in buffer for later use */ @@ -740,6 +731,7 @@ int prefetchReq(int acceptfd) { } } } + /* Check for overflow in the buffer */ if (index >= PRE_BUF_SIZE) { printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__); @@ -754,11 +746,14 @@ int prefetchReq(int acceptfd) { } } - /* Add the buffer size into buffer as a parameter */ - *((unsigned int *)buffer)=index; + //Send buffer size + if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) { + perror("Error: in sending PREFETCH RESPONSE to Coordinator\n"); + return 1; + } /* Send the entire buffer with its size and oids found and not found */ - if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) { + if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) { perror("Error: sending oids found\n"); return 1; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 247cea42..d560c825 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -91,10 +91,6 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short int len = 0; int i, rc; - //do { - // rc=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); - //} while(rc!=0); - /* Allocate for the queue node*/ char *node; if(ntuples > 0) { @@ -1590,16 +1586,11 @@ void getPrefetchResponse(int count, int sd) { if(control == TRANS_PREFETCH_RESPONSE) { /*For each oid and offset tuple sent as prefetch request to remote machine*/ - while(i < count) { + while(N = recv((int)sd, &bufsize, sizeof(unsigned int), 0) != 0) { + bzero(&buffer, RECEIVE_BUFFER_SIZE); sum = 0; index = 0; - /* Read the size of buffer to be received */ - if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) { - perror("Size of buffer not recv\n"); - return; - } - bufsize = *((unsigned int *) buffer); - ptr = buffer + sizeof(unsigned int); + ptr = buffer; /* Keep receiving the buffer containing oid info */ do { n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0); @@ -1607,8 +1598,7 @@ void getPrefetchResponse(int count, int sd) { } while(sum < bufsize && n != 0); /* Decode the contents of the buffer */ - index = sizeof(unsigned int); - while(index < (bufsize - sizeof(unsigned int))) { + while(index < bufsize ) { if(buffer[index] == OBJECT_FOUND) { /* Increment it to get the object */ index += sizeof(char); @@ -1659,12 +1649,10 @@ void getPrefetchResponse(int count, int sd) { printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); exit(-1); } else { - printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__); + printf("Error in decoding the index value %d, %s, %d\n",index, __FILE__, __LINE__); return; } } - - i++; } } else printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);