pool thread processing complete
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
index 87f21dec75eda9cbcafb296e78a61ea6e2f4473d..33cbd581ba2fcdca9f80b273716bfbf8bd1be1b6 100644 (file)
@@ -530,20 +530,23 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
 }
 
 int prefetchReq(int acceptfd) {
-       int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size;
+       int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
        unsigned int oid, index = 0;
        char *ptr, buffer[PRE_BUF_SIZE];
        void *mobj;
-       unsigned int *oidnotfound, objoid;
-       char *header;
+       unsigned int objoid;
+       char *header, control;
        objheader_t * head;
        
        /* Repeatedly recv the oid and offset pairs sent for prefetch */
        while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+               count++;
                if(length == -1)
                        break;
                sum = 0;
-               index = 0;
+               index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
+                                             // first 4 bytes are saved to send the
+                                             // size of the buffer (that is computed at the end of the loop)
                oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
                numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
                N = numoffset * sizeof(short);
@@ -564,7 +567,7 @@ int prefetchReq(int acceptfd) {
                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                        index += sizeof(unsigned int);
                } else { /* If Obj found in machine (i.e. has not moved) */
-                       /* Return the oid ..its header and data */
+                       /* send the oid, it's size, it's header and data */
                        header = (char *) mobj;
                        head = (objheader_t *) header; 
                        size = sizeof(objheader_t) + sizeof(classsize[head->type]);
@@ -572,25 +575,30 @@ int prefetchReq(int acceptfd) {
                        index += sizeof(char);
                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                        index += sizeof(unsigned int);
+                       memcpy(buffer+index, &size, sizeof(int));
+                       index += sizeof(int);
                        memcpy(buffer + index, header, size);
                        index += size;
                        /* Calculate the oid corresponding to the offset value */
                        for(i = 0 ; i< numoffset ; i++) {
                                objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
                                if((header = (char *) mhashSearch(objoid)) == NULL) {
-                                       /* Obj not found, send oid and its offsets */
+                                       /* Obj not found, send oid */
                                        *(buffer + index) = OBJECT_NOT_FOUND;
                                        index += sizeof(char);
                                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                                        index += sizeof(unsigned int);
                                        break;
                                } else {/* Obj Found */
+                                       /* send the oid, it's size, it's header and data */
                                        head = (objheader_t *) header; 
                                        size = sizeof(objheader_t) + sizeof(classsize[head->type]);
                                        *(buffer + index) = OBJECT_FOUND;
                                        index += sizeof(char);
                                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                                        index += sizeof(unsigned int);
+                                       memcpy(buffer+index, &size, sizeof(int));
+                                       index += sizeof(int);
                                        memcpy(buffer + index, header, size);
                                        index += size;
                                        continue;
@@ -602,12 +610,22 @@ int prefetchReq(int acceptfd) {
                        printf("Char buffer is overflowing\n");
                        return 1;
                }
-               /* Send the buffer with all oids found and not found */
+               /* Send Prefetch response control message only once*/
+               if(count == 1) {
+                       control = TRANS_PREFETCH_RESPONSE;
+                       if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+                               perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+                               return 1;
+                       }
+               }
+
+               /* Send the buffer size */
+               memcpy(buffer, &index, sizeof(unsigned int));
+               /* Send the entire buffer with its size and oids found and not found */
                if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
-                       perror("Error sending size of object\n");
+                       perror("Error sending oids found\n");
                        return 1;
                }
        }
-
        return 0;
 }