pool thread processing complete
authoradash <adash>
Mon, 6 Aug 2007 08:18:23 +0000 (08:18 +0000)
committeradash <adash>
Mon, 6 Aug 2007 08:18:23 +0000 (08:18 +0000)
TODO deallocate dequeued nodes

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index f2f477fa6105db1557cfcabc4e8c133b21f5db4f..363f2f1d4442d5607ebf5d0a92e18c218527bb1a 100644 (file)
@@ -32,6 +32,7 @@
 #define TRANS_AGREE_BUT_MISSING_OBJECTS        19
 #define TRANS_SOFT_ABORT               20
 #define TRANS_SUCESSFUL                        21
+#define TRANS_PREFETCH_RESPONSE                22
 
 //Control bits for status of objects in Machine pile
 #define OBJ_LOCKED_BUT_VERSION_MATCH   14
@@ -199,6 +200,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *);
 prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
 void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int);
 int transPrefetchProcess(transrecord_t *, int **, short);
-void *sendPrefetchReq(prefetchpile_t*, int);
+void sendPrefetchReq(prefetchpile_t*, int);
+void getPrefetchResponse(int, int);
 /* end transactions */
 #endif
index 87f21dec75eda9cbcafb296e78a61ea6e2f4473d..33cbd581ba2fcdca9f80b273716bfbf8bd1be1b6 100644 (file)
@@ -530,20 +530,23 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
 }
 
 int prefetchReq(int acceptfd) {
-       int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size;
+       int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
        unsigned int oid, index = 0;
        char *ptr, buffer[PRE_BUF_SIZE];
        void *mobj;
-       unsigned int *oidnotfound, objoid;
-       char *header;
+       unsigned int objoid;
+       char *header, control;
        objheader_t * head;
        
        /* Repeatedly recv the oid and offset pairs sent for prefetch */
        while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+               count++;
                if(length == -1)
                        break;
                sum = 0;
-               index = 0;
+               index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
+                                             // first 4 bytes are saved to send the
+                                             // size of the buffer (that is computed at the end of the loop)
                oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
                numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
                N = numoffset * sizeof(short);
@@ -564,7 +567,7 @@ int prefetchReq(int acceptfd) {
                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                        index += sizeof(unsigned int);
                } else { /* If Obj found in machine (i.e. has not moved) */
-                       /* Return the oid ..its header and data */
+                       /* send the oid, it's size, it's header and data */
                        header = (char *) mobj;
                        head = (objheader_t *) header; 
                        size = sizeof(objheader_t) + sizeof(classsize[head->type]);
@@ -572,25 +575,30 @@ int prefetchReq(int acceptfd) {
                        index += sizeof(char);
                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                        index += sizeof(unsigned int);
+                       memcpy(buffer+index, &size, sizeof(int));
+                       index += sizeof(int);
                        memcpy(buffer + index, header, size);
                        index += size;
                        /* Calculate the oid corresponding to the offset value */
                        for(i = 0 ; i< numoffset ; i++) {
                                objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
                                if((header = (char *) mhashSearch(objoid)) == NULL) {
-                                       /* Obj not found, send oid and its offsets */
+                                       /* Obj not found, send oid */
                                        *(buffer + index) = OBJECT_NOT_FOUND;
                                        index += sizeof(char);
                                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                                        index += sizeof(unsigned int);
                                        break;
                                } else {/* Obj Found */
+                                       /* send the oid, it's size, it's header and data */
                                        head = (objheader_t *) header; 
                                        size = sizeof(objheader_t) + sizeof(classsize[head->type]);
                                        *(buffer + index) = OBJECT_FOUND;
                                        index += sizeof(char);
                                        memcpy(buffer+index, &oid, sizeof(unsigned int));
                                        index += sizeof(unsigned int);
+                                       memcpy(buffer+index, &size, sizeof(int));
+                                       index += sizeof(int);
                                        memcpy(buffer + index, header, size);
                                        index += size;
                                        continue;
@@ -602,12 +610,22 @@ int prefetchReq(int acceptfd) {
                        printf("Char buffer is overflowing\n");
                        return 1;
                }
-               /* Send the buffer with all oids found and not found */
+               /* Send Prefetch response control message only once*/
+               if(count == 1) {
+                       control = TRANS_PREFETCH_RESPONSE;
+                       if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+                               perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+                               return 1;
+                       }
+               }
+
+               /* Send the buffer size */
+               memcpy(buffer, &index, sizeof(unsigned int));
+               /* Send the entire buffer with its size and oids found and not found */
                if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
-                       perror("Error sending size of object\n");
+                       perror("Error sending oids found\n");
                        return 1;
                }
        }
-
        return 0;
 }
index ae8fd5ba75d1f57536368c71dd052fe9fb938782..fdbccf86a6faaa38fa75c5f48ecaee4b03383b27 100644 (file)
@@ -90,7 +90,7 @@ void transInit() {
        prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
        //Create prefetch cache lookup table
        if(prehashCreate(HASH_SIZE, LOADFACTOR))
-                       return; //Failure
+               return; //Failure
        //Initialize primary shared queue
        queueInit();
        //Initialize machine pile w/prefetch oids and offsets shared queue
@@ -137,7 +137,7 @@ transrecord_t *transStart()
        transrecord_t *tmp = malloc(sizeof(transrecord_t));
        tmp->cache = objstrCreate(1048576);
        tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
-       
+
        return tmp;
 }
 
@@ -150,12 +150,12 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        void *objcopy;
        int size;
        void *buf;
-               /* Search local cache */
+       /* Search local cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
-//             tmp = mhashSearch(oid);
+               //              tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)objheader, size);
@@ -216,10 +216,10 @@ plistnode_t *createPiles(transrecord_t *record) {
                        }
                        next = curr->next;
                        //Get machine location for object id
-                       
+
                        if ((machinenum = lhashSearch(curr->key)) == 0) {
-                              printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
-                              return NULL;
+                               printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+                               return NULL;
                        }
 
                        if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
@@ -231,12 +231,12 @@ plistnode_t *createPiles(transrecord_t *record) {
                                printf("pInsert error %s, %d\n", __FILE__, __LINE__);
                                return NULL;
                        }
-                       
+
                        /* Check if local or not */
                        if((localmachinenum = mhashSearch(curr->key)) != NULL) { 
                                pile->local = 1; //True i.e. local
                        }
-               
+
                        curr = next;
                }
        }
@@ -269,14 +269,14 @@ int transCommit(transrecord_t *record) {
 
        /* Count the number of participants */
        pilecount = pCount(pile);
-               
+
        /* Create a list of machine ids(Participants) involved in transaction   */
        if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
                printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                return 1;
        }               
        pListMid(pile, listmid);
-       
+
 
        /* Initialize thread variables,
         * Spawn a thread for each Participant involved in a transaction */
@@ -285,7 +285,7 @@ int transCommit(transrecord_t *record) {
        pthread_cond_t tcond;
        pthread_mutex_t tlock;
        pthread_mutex_t tlshrd;
-       
+
        thread_data_array_t *thread_data_array;
        thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
        local_thread_data_array_t *ltdata;
@@ -301,7 +301,7 @@ int transCommit(transrecord_t *record) {
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
        pthread_mutex_init(&tlock, NULL);
        pthread_cond_init(&tcond, NULL);
-       
+
        /* Process each machine pile */
        while(pile != NULL) {
                //Create transaction id
@@ -364,7 +364,7 @@ int transCommit(transrecord_t *record) {
                        return 1;
                }
        }
-       
+
        /* Free resources */    
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
@@ -382,7 +382,7 @@ int transCommit(transrecord_t *record) {
                /* Retry the commiting transaction again */
                transCommit(record);
        }       
-       
+
        return 0;
 }
 
@@ -417,7 +417,7 @@ void *transRequest(void *threadarg) {
                perror("Error in connect for TRANS_REQUEST\n");
                return NULL;
        }
-       
+
        printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
        /* Send bytes of data with TRANS_REQUEST control message */
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
@@ -426,29 +426,29 @@ void *transRequest(void *threadarg) {
        }
        /* Send list of machines involved in the transaction */
        {
-         int size=sizeof(unsigned int)*tdata->pilecount;
-         if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
-           perror("Error sending list of machines for thread\n");
-           return NULL;
-         }
+               int size=sizeof(unsigned int)*tdata->pilecount;
+               if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
+                       perror("Error sending list of machines for thread\n");
+                       return NULL;
+               }
        }
        /* Send oids and version number tuples for objects that are read */
        {
-         int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
-         if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
-           perror("Error sending tuples for thread\n");
-           return NULL;
-         }
+               int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
+               if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
+                       perror("Error sending tuples for thread\n");
+                       return NULL;
+               }
        }
        /* Send objects that are modified */
        for(i = 0; i < tdata->buffer->f.nummod ; i++) {
-         int size;
-         headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-         size=sizeof(objheader_t)+classsize[headeraddr->type];
-         if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
-           perror("Error sending obj modified for thread\n");
-           return NULL;
-         }
+               int size;
+               headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+               size=sizeof(objheader_t)+classsize[headeraddr->type];
+               if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
+                       perror("Error sending obj modified for thread\n");
+                       return NULL;
+               }
        }
 
        /* Read control message from Participant */
@@ -457,7 +457,7 @@ void *transRequest(void *threadarg) {
                return NULL;
        }
        recvcontrol = control;
-       
+
        /* Update common data structure and increment count */
        tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
 
@@ -517,7 +517,7 @@ int decideResponse(thread_data_array_t *tdata) {
                                printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
                                transagree++;
                                break;
-                               
+
                        case TRANS_SOFT_ABORT:
                                printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
                                transsoftabort++;
@@ -527,7 +527,7 @@ int decideResponse(thread_data_array_t *tdata) {
                                return -1;
                }
        }
-       
+
        /* Decide what control message to send to Participant */        
        if(transdisagree > 0) {
                /* Send Abort */
@@ -552,7 +552,7 @@ int decideResponse(thread_data_array_t *tdata) {
                printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
                return -1;
        }
-       
+
        return 0;
 }
 /* This function sends the final response to all threads in their respective socket id */
@@ -752,7 +752,7 @@ void *handleLocalReq(void *threadarg) {
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
                                        printf("DEBUG -> Sending TRANS_DISAGREE\n");
-                               //      return tdata->recvmsg[tdata->thread_id].rcv_status;  
+                                       //      return tdata->recvmsg[tdata->thread_id].rcv_status;  
                                }
                        }
                }
@@ -771,10 +771,10 @@ void *handleLocalReq(void *threadarg) {
                printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
                /* Send number of oids not found and the missing oids if objects are missing in the machine */
                /* TODO Remember to store the oidnotfound for later use
-               if(objnotfound != 0) {
-                       int size = sizeof(unsigned int)* objnotfound;
-               }
-               */
+                  if(objnotfound != 0) {
+                  int size = sizeof(unsigned int)* objnotfound;
+                  }
+                  */
        }
 
        /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
@@ -789,7 +789,7 @@ void *handleLocalReq(void *threadarg) {
 
        /*Set flag to show that common data structure for this individual thread has been written to */
        //*(tdata->localstatus) |= LM_UPDATED;
-       
+
        /* Lock and update count */
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(localtdata->tdata->lock);
@@ -836,11 +836,11 @@ void *handleLocalReq(void *threadarg) {
                free(localtdata->transinfo->objnotfound);
                localtdata->transinfo->objnotfound = NULL;
        }
-       
+
        pthread_exit(NULL);
 }
 /* This function completes the ABORT process if the transaction is aborting 
- */
+*/
 int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
        char *ptr;
        int i;
@@ -871,44 +871,44 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 }
 
 /*This function completes the COMMIT process is the transaction is commiting
- */
- int transComProcess(trans_commit_data_t *transinfo) {
-        objheader_t *header;
-        int i = 0, offset = 0;
-        char control;
-        
-        printf("DEBUG -> Recv TRANS_COMMIT\n");
-        /* Process each modified object saved in the mainobject store */
-        for(i=0; i<transinfo->nummod; i++) {
-                if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
-                        printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                }
-                /* Change reference count of older address and free space in objstr ?? */
-                header->rcount = 1; //TODO Not sure what would be the val
-
-                /* Change ptr address in mhash table */
-                mhashRemove(transinfo->objmod[i]);
-                mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
-                offset += sizeof(objheader_t) + classsize[header->type];
-
-                /* Update object version number */
-                header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
-                header->version += 1;
-        }
-
-        /* Unlock locked objects */
-        for(i=0; i<transinfo->numlocked; i++) {
-                header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
-                header->status &= ~(LOCK);
-        }
-
-        //TODO Update location lookup table
-        //TODO/* Unset the bit for local objects */
-
-        /* Send ack to Coordinator */
-        printf("DEBUG-> TRANS_SUCESSFUL\n");
-        return 0;
- }
+*/
+int transComProcess(trans_commit_data_t *transinfo) {
+       objheader_t *header;
+       int i = 0, offset = 0;
+       char control;
+
+       printf("DEBUG -> Recv TRANS_COMMIT\n");
+       /* Process each modified object saved in the mainobject store */
+       for(i=0; i<transinfo->nummod; i++) {
+               if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+               }
+               /* Change reference count of older address and free space in objstr ?? */
+               header->rcount = 1; //TODO Not sure what would be the val
+
+               /* Change ptr address in mhash table */
+               mhashRemove(transinfo->objmod[i]);
+               mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+               offset += sizeof(objheader_t) + classsize[header->type];
+
+               /* Update object version number */
+               header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+               header->version += 1;
+       }
+
+       /* Unlock locked objects */
+       for(i=0; i<transinfo->numlocked; i++) {
+               header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+               header->status &= ~(LOCK);
+       }
+
+       //TODO Update location lookup table
+       //TODO/* Unset the bit for local objects */
+
+       /* Send ack to Coordinator */
+       printf("DEBUG-> TRANS_SUCESSFUL\n");
+       return 0;
+}
 
 /* This function checks if the prefetch oids are same and have same offsets  
  * for case x.a.b and y.a.b where x and y have same oid's
@@ -923,7 +923,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
        short *endoffsets, *arryfields; 
 
        /* Check for the case x.y.z and a.b.c are same oids */ 
-       ptr = (char *) node;
+       ptr = (char *) node;
        ntuples = *(GET_NTUPLES(ptr));
        oid = GET_PTR_OID(ptr);
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
@@ -1057,7 +1057,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
        prefetchpile_t *head = NULL;
 
        /* Check for the case x.y.z and a.b.c are same oids */ 
-       ptr = (char *) node;
+       ptr = (char *) node;
        ntuples = *(GET_NTUPLES(ptr));
        oid = GET_PTR_OID(ptr);
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
@@ -1093,7 +1093,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        short *endoffsets, *arryfields; 
        prefetchpile_t *head = NULL;
 
-       ptr = (char *) node;
+       ptr = (char *) node;
        ntuples = *(GET_NTUPLES(ptr));
        oid = GET_PTR_OID(ptr);
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
@@ -1134,7 +1134,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
                                } else 
                                        flag = 0;
                        }
-               
+
                        /*If all offset oids are found locally,make the prefetch tuple invalid */
                        if(flag == 0) {
                                oid[i] = -1;
@@ -1177,7 +1177,7 @@ void *transPrefetch(void *t) {
                /* Check if the tuples are found locally, if yes then reduce them further*/ 
                /* and group requests by remote machine ids by calling the makePreGroups() */
                pilehead = foundLocal(qnode);
-               
+
                /* Lock mutex of pool queue */
                pthread_mutex_lock(&mcqueue.qlock);
                /* Update the pool queue with the new remote machine piles generated per prefetch call */
@@ -1213,12 +1213,10 @@ void *mcqProcess(void *threadid) {
                pthread_mutex_unlock(&mcqueue.qlock);
 
                /*Initiate connection to remote host and send request */ 
-               sendPrefetchReq(mcpilenode, tid);
                /* Process Request */
-
-
+               sendPrefetchReq(mcpilenode, tid);
                /* TODO: For each object not found query DHT for new location and retrieve the object */
-       
+
                /* Deallocate the dequeued node */
        }
 }
@@ -1244,7 +1242,7 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
        pthread_t thread[numoids];
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-               
+
        /* Create  Machine Piles to send prefetch requests use threads*/
        for( i = 0 ; i< numoids ; i++) {
                if(arrayofoffset[i][0] == -1) 
@@ -1271,13 +1269,13 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo
                }
        }
        pthread_attr_destroy(&attr);
-       
+
        return 0;
-               
+
 }
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
-       int sd, i, offset, off, len, endpair;
+       int sd, i, offset, off, len, endpair, numoffsets, count = 0;
        struct sockaddr_in serv_addr;
        struct hostent *server;
        char machineip[16], control;
@@ -1314,6 +1312,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        tmp = mcpilenode->objpiles;
        while(tmp != NULL) {
                off = offset = 0;
+               count++;  // Keeps track of the number of oid and offset tuples sent per remote machine
                len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
                char oidnoffset[len];
                memcpy(oidnoffset, &len, sizeof(int));
@@ -1331,6 +1330,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
                tmp = tmp->next;
        }
 
+       /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
        endpair = -1;
        if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
                perror("Error sending fixed bytes for thread\n");
@@ -1338,28 +1338,78 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
        }
 
        /* Get Response from the remote machine */
-       getPrefetchResponse();
-
-//     close(sd);
+       getPrefetchResponse(count,sd);
+       close(sd);
 }
 
-void getPrefetchResponse() {
-       int i;
-
-       /* Lock the Prefetch Cache look up table*/
-       pthread_mutex_lock(&pflookup.lock);
-
-       /*TODO For each object found add to Prefetch Cache */
-
-       /* Broadcast signal on prefetch cache condition variable */ 
-       pthread_cond_broadcast(&pflookup.qcond);
-       
-       /* Unlock the Prefetch Cache look up table*/
-       pthread_mutex_unlock(&pflookup.lock);
-
-
-
-
+void getPrefetchResponse(int count, int sd) {
+       int i = 0, val, n, N, sum, index, objsize;
+       unsigned int bufsize,oid;
+       char buffer[RECEIVE_BUFFER_SIZE], control;
+       char *ptr;
+       void *modptr;
 
+       /* Read  prefetch response from the Remote machine */
+       if((val = read(sd, &control, sizeof(char))) <= 0) {
+               perror("No control response for Prefetch request sent\n");
+               return;
+       }
+       if(control == TRANS_PREFETCH_RESPONSE) {
+               /*For each oid and offset tuple sent as prefetch request to remote machine*/
+               while(i < count) {
+                       /* Clear contents of buffer */
+                       memset(buffer, 0, RECEIVE_BUFFER_SIZE);
+                       sum = 0;
+                       index = 0;
+                       /* Read the size of buffer to be received */
+                       if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) {
+                               perror("Size of buffer not recv\n");
+                               return;
+                       }
+                       memcpy(&bufsize, buffer, sizeof(unsigned int));
+                       ptr = buffer + sizeof(unsigned int);
+                       /* Keep receiving the buffer containing oid info */ 
+                       do {
+                               n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
+                               sum +=n;
+                       } while(sum < bufsize && n != 0);
+                       /* Decode the contents of the buffer */
+                       index = sizeof(unsigned int);
+                       while(index < (bufsize - sizeof(unsigned int))) {
+                               if(buffer[index] == OBJECT_FOUND) {
+                                       /* Increment it to get the object */
+                                       index += sizeof(char);
+                                       memcpy(&oid, buffer + index, sizeof(unsigned int));
+                                       index += sizeof(unsigned int);
+                                       /* Lock the Prefetch Cache look up table*/
+                                       pthread_mutex_lock(&pflookup.lock);
+                                       /* For each object found add to Prefetch Cache */
+                                       memcpy(&objsize, buffer + index, sizeof(int));
+                                       if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
+                                               printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+                                               return;
+                                       }
+                                       memcpy(modptr, buffer+index, objsize);
+                                       index += sizeof(int);
+                                       /* Add pointer and oid to hash table */
+                                       //TODO Do we need a version comparison herei ??
+                                       prehashInsert(oid, modptr);
+                                       /* Broadcast signal on prefetch cache condition variable */ 
+                                       pthread_cond_broadcast(&pflookup.cond);
+                                       /* Unlock the Prefetch Cache look up table*/
+                                       pthread_mutex_unlock(&pflookup.lock);
+                               } else if(buffer[index] == OBJECT_NOT_FOUND) {
+                                       /* Increment it to get the object */
+                                       // TODO If object not found, local machine takes inventory
+                                       index += sizeof(char);
+                                       memcpy(&oid, buffer + index, sizeof(unsigned int));
+                                       index += sizeof(unsigned int);
+                               } else 
+                                       printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
+                       }
 
+                       i++;
+               }
+       } else
+               printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
 }