some bug fixes for sending and receiving objects (that form the prefetch tuples)
authoradash <adash>
Wed, 20 Feb 2008 04:54:19 +0000 (04:54 +0000)
committeradash <adash>
Wed, 20 Feb 2008 04:54:19 +0000 (04:54 +0000)
while serving the  prefetch request across machines.

Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 4434ee002b91270cb0fba941d010467ec08d36d7..a91134af64e1f049ff526572b1789fe5f61ffa7e 100644 (file)
@@ -651,23 +651,13 @@ int prefetchReq(int acceptfd) {
        char control;
        objheader_t * header;
        int bytesRecvd;
-/*
-       unsigned int myIpAddr;
 
-#ifdef MAC
-       myIpAddr = getMyIpAddr("en1");
-#else
-       myIpAddr = getMyIpAddr("eth0");
-#endif
-*/
-       /* Repeatedly recv the oid and offset pairs sent for prefetch */
+       /* Repeatedly recv one oid and offset pair sent for prefetch */
        while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
                count++;
                if(length == -1)
                        break;
-               index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
-               // first 4 bytes are saved to send the
-               // size of the buffer (that is computed at the end of the loop)
+               index = 0;  
                bytesRecvd = 0;
                do {
                        bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
@@ -684,6 +674,7 @@ int prefetchReq(int acceptfd) {
                        sum += n; 
                } while(sum < N && n != 0);     
 
+               bzero(&buffer, PRE_BUF_SIZE);
                /* Process each oid */
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
                        /* Save the oids not found in buffer for later use */
@@ -740,6 +731,7 @@ int prefetchReq(int acceptfd) {
                                }
                        }
                }
+
                /* Check for overflow in the buffer */
                if (index >= PRE_BUF_SIZE) {
                        printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
@@ -754,11 +746,14 @@ int prefetchReq(int acceptfd) {
                        }
                }
 
-               /* Add the buffer size into buffer as a parameter */
-               *((unsigned int *)buffer)=index;
+               //Send buffer size 
+               if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) {
+                       perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
+                       return 1;
+               }
 
                /* Send the entire buffer with its size and oids found and not found */
-               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
+               if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) {
                        perror("Error: sending oids found\n");
                        return 1;
                }
index 247cea421a991204b6b3be527943c45deeb31707..d560c825f0d981ffc3c3d5cb00211b329dfc6f99 100644 (file)
@@ -91,10 +91,6 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
        int len = 0;
        int i, rc;
        
-       //do {
-       //      rc=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
-       //} while(rc!=0);
-       
        /* Allocate for the queue node*/
        char *node;
        if(ntuples > 0) {
@@ -1590,16 +1586,11 @@ void getPrefetchResponse(int count, int sd) {
 
        if(control == TRANS_PREFETCH_RESPONSE) {
                /*For each oid and offset tuple sent as prefetch request to remote machine*/
-               while(i < count) {
+               while(N = recv((int)sd, &bufsize, sizeof(unsigned int), 0) != 0) {
+                       bzero(&buffer, RECEIVE_BUFFER_SIZE);
                        sum = 0;
                        index = 0;
-                       /* Read the size of buffer to be received */
-                       if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) {
-                               perror("Size of buffer not recv\n");
-                               return;
-                       }
-                       bufsize = *((unsigned int *) buffer);
-                       ptr = buffer + sizeof(unsigned int);
+                       ptr = buffer;
                        /* Keep receiving the buffer containing oid info */ 
                        do {
                                n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
@@ -1607,8 +1598,7 @@ void getPrefetchResponse(int count, int sd) {
                        } while(sum < bufsize && n != 0);
 
                        /* Decode the contents of the buffer */
-                       index = sizeof(unsigned int);
-                       while(index < (bufsize - sizeof(unsigned int))) {
+                       while(index < bufsize ) {
                                if(buffer[index] == OBJECT_FOUND) {
                                        /* Increment it to get the object */
                                        index += sizeof(char);
@@ -1659,12 +1649,10 @@ void getPrefetchResponse(int count, int sd) {
                                        printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
                                        exit(-1);
                                } else {
-                                       printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
+                                       printf("Error in decoding the index value %d, %s, %d\n",index, __FILE__, __LINE__);
                                        return;
                                }
                        }
-
-                       i++;
                }
        } else
                printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);