From 9ac1cba43ce73268dc19d7c82707e7e2d8daf9fd Mon Sep 17 00:00:00 2001 From: adash Date: Mon, 6 Aug 2007 08:18:23 +0000 Subject: [PATCH] pool thread processing complete TODO deallocate dequeued nodes --- Robust/src/Runtime/DSTM/interface/dstm.h | 4 +- .../src/Runtime/DSTM/interface/dstmserver.c | 36 ++- Robust/src/Runtime/DSTM/interface/trans.c | 282 +++++++++++------- 3 files changed, 196 insertions(+), 126 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index f2f477fa..363f2f1d 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -32,6 +32,7 @@ #define TRANS_AGREE_BUT_MISSING_OBJECTS 19 #define TRANS_SOFT_ABORT 20 #define TRANS_SUCESSFUL 21 +#define TRANS_PREFETCH_RESPONSE 22 //Control bits for status of objects in Machine pile #define OBJ_LOCKED_BUT_VERSION_MATCH 14 @@ -199,6 +200,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *); prefetchpile_t *makePreGroups(prefetchqelem_t *, int *); void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, int); int transPrefetchProcess(transrecord_t *, int **, short); -void *sendPrefetchReq(prefetchpile_t*, int); +void sendPrefetchReq(prefetchpile_t*, int); +void getPrefetchResponse(int, int); /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 87f21dec..33cbd581 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -530,20 +530,23 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { } int prefetchReq(int acceptfd) { - int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size; + int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0; unsigned int oid, index = 0; char *ptr, buffer[PRE_BUF_SIZE]; void *mobj; - unsigned int *oidnotfound, objoid; - char *header; + unsigned int objoid; + char *header, control; objheader_t * head; /* Repeatedly recv the oid and offset pairs sent for prefetch */ while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { + count++; if(length == -1) break; sum = 0; - index = 0; + index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the + // first 4 bytes are saved to send the + // size of the buffer (that is computed at the end of the loop) oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0); numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short); N = numoffset * sizeof(short); @@ -564,7 +567,7 @@ int prefetchReq(int acceptfd) { memcpy(buffer+index, &oid, sizeof(unsigned int)); index += sizeof(unsigned int); } else { /* If Obj found in machine (i.e. has not moved) */ - /* Return the oid ..its header and data */ + /* send the oid, it's size, it's header and data */ header = (char *) mobj; head = (objheader_t *) header; size = sizeof(objheader_t) + sizeof(classsize[head->type]); @@ -572,25 +575,30 @@ int prefetchReq(int acceptfd) { index += sizeof(char); memcpy(buffer+index, &oid, sizeof(unsigned int)); index += sizeof(unsigned int); + memcpy(buffer+index, &size, sizeof(int)); + index += sizeof(int); memcpy(buffer + index, header, size); index += size; /* Calculate the oid corresponding to the offset value */ for(i = 0 ; i< numoffset ; i++) { objoid = *((int *)(header + sizeof(objheader_t) + offset[i])); if((header = (char *) mhashSearch(objoid)) == NULL) { - /* Obj not found, send oid and its offsets */ + /* Obj not found, send oid */ *(buffer + index) = OBJECT_NOT_FOUND; index += sizeof(char); memcpy(buffer+index, &oid, sizeof(unsigned int)); index += sizeof(unsigned int); break; } else {/* Obj Found */ + /* send the oid, it's size, it's header and data */ head = (objheader_t *) header; size = sizeof(objheader_t) + sizeof(classsize[head->type]); *(buffer + index) = OBJECT_FOUND; index += sizeof(char); memcpy(buffer+index, &oid, sizeof(unsigned int)); index += sizeof(unsigned int); + memcpy(buffer+index, &size, sizeof(int)); + index += sizeof(int); memcpy(buffer + index, header, size); index += size; continue; @@ -602,12 +610,22 @@ int prefetchReq(int acceptfd) { printf("Char buffer is overflowing\n"); return 1; } - /* Send the buffer with all oids found and not found */ + /* Send Prefetch response control message only once*/ + if(count == 1) { + control = TRANS_PREFETCH_RESPONSE; + if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { + perror("Error in sending PREFETCH RESPONSE to Coordinator\n"); + return 1; + } + } + + /* Send the buffer size */ + memcpy(buffer, &index, sizeof(unsigned int)); + /* Send the entire buffer with its size and oids found and not found */ if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) { - perror("Error sending size of object\n"); + perror("Error sending oids found\n"); return 1; } } - return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index ae8fd5ba..fdbccf86 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -90,7 +90,7 @@ void transInit() { prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); //Create prefetch cache lookup table if(prehashCreate(HASH_SIZE, LOADFACTOR)) - return; //Failure + return; //Failure //Initialize primary shared queue queueInit(); //Initialize machine pile w/prefetch oids and offsets shared queue @@ -137,7 +137,7 @@ transrecord_t *transStart() transrecord_t *tmp = malloc(sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); - + return tmp; } @@ -150,12 +150,12 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) void *objcopy; int size; void *buf; - /* Search local cache */ + /* Search local cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ -// tmp = mhashSearch(oid); + // tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; objcopy = objstrAlloc(record->cache, size); memcpy(objcopy, (void *)objheader, size); @@ -216,10 +216,10 @@ plistnode_t *createPiles(transrecord_t *record) { } next = curr->next; //Get machine location for object id - + if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); - return NULL; + printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); + return NULL; } if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { @@ -231,12 +231,12 @@ plistnode_t *createPiles(transrecord_t *record) { printf("pInsert error %s, %d\n", __FILE__, __LINE__); return NULL; } - + /* Check if local or not */ if((localmachinenum = mhashSearch(curr->key)) != NULL) { pile->local = 1; //True i.e. local } - + curr = next; } } @@ -269,14 +269,14 @@ int transCommit(transrecord_t *record) { /* Count the number of participants */ pilecount = pCount(pile); - + /* Create a list of machine ids(Participants) involved in transaction */ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); return 1; } pListMid(pile, listmid); - + /* Initialize thread variables, * Spawn a thread for each Participant involved in a transaction */ @@ -285,7 +285,7 @@ int transCommit(transrecord_t *record) { pthread_cond_t tcond; pthread_mutex_t tlock; pthread_mutex_t tlshrd; - + thread_data_array_t *thread_data_array; thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); local_thread_data_array_t *ltdata; @@ -301,7 +301,7 @@ int transCommit(transrecord_t *record) { pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_mutex_init(&tlock, NULL); pthread_cond_init(&tcond, NULL); - + /* Process each machine pile */ while(pile != NULL) { //Create transaction id @@ -364,7 +364,7 @@ int transCommit(transrecord_t *record) { return 1; } } - + /* Free resources */ pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); @@ -382,7 +382,7 @@ int transCommit(transrecord_t *record) { /* Retry the commiting transaction again */ transCommit(record); } - + return 0; } @@ -417,7 +417,7 @@ void *transRequest(void *threadarg) { perror("Error in connect for TRANS_REQUEST\n"); return NULL; } - + printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); /* Send bytes of data with TRANS_REQUEST control message */ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { @@ -426,29 +426,29 @@ void *transRequest(void *threadarg) { } /* Send list of machines involved in the transaction */ { - int size=sizeof(unsigned int)*tdata->pilecount; - if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { - perror("Error sending list of machines for thread\n"); - return NULL; - } + int size=sizeof(unsigned int)*tdata->pilecount; + if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { + perror("Error sending list of machines for thread\n"); + return NULL; + } } /* Send oids and version number tuples for objects that are read */ { - int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; - if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { - perror("Error sending tuples for thread\n"); - return NULL; - } + int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; + if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { + perror("Error sending tuples for thread\n"); + return NULL; + } } /* Send objects that are modified */ for(i = 0; i < tdata->buffer->f.nummod ; i++) { - int size; - headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); - size=sizeof(objheader_t)+classsize[headeraddr->type]; - if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { - perror("Error sending obj modified for thread\n"); - return NULL; - } + int size; + headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + size=sizeof(objheader_t)+classsize[headeraddr->type]; + if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { + perror("Error sending obj modified for thread\n"); + return NULL; + } } /* Read control message from Participant */ @@ -457,7 +457,7 @@ void *transRequest(void *threadarg) { return NULL; } recvcontrol = control; - + /* Update common data structure and increment count */ tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; @@ -517,7 +517,7 @@ int decideResponse(thread_data_array_t *tdata) { printf("DEBUG-> trans.c Recv TRANS_AGREE\n"); transagree++; break; - + case TRANS_SOFT_ABORT: printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n"); transsoftabort++; @@ -527,7 +527,7 @@ int decideResponse(thread_data_array_t *tdata) { return -1; } } - + /* Decide what control message to send to Participant */ if(transdisagree > 0) { /* Send Abort */ @@ -552,7 +552,7 @@ int decideResponse(thread_data_array_t *tdata) { printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__); return -1; } - + return 0; } /* This function sends the final response to all threads in their respective socket id */ @@ -752,7 +752,7 @@ void *handleLocalReq(void *threadarg) { /* Send TRANS_DISAGREE to Coordinator */ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; printf("DEBUG -> Sending TRANS_DISAGREE\n"); - // return tdata->recvmsg[tdata->thread_id].rcv_status; + // return tdata->recvmsg[tdata->thread_id].rcv_status; } } } @@ -771,10 +771,10 @@ void *handleLocalReq(void *threadarg) { printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); /* Send number of oids not found and the missing oids if objects are missing in the machine */ /* TODO Remember to store the oidnotfound for later use - if(objnotfound != 0) { - int size = sizeof(unsigned int)* objnotfound; - } - */ + if(objnotfound != 0) { + int size = sizeof(unsigned int)* objnotfound; + } + */ } /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process @@ -789,7 +789,7 @@ void *handleLocalReq(void *threadarg) { /*Set flag to show that common data structure for this individual thread has been written to */ //*(tdata->localstatus) |= LM_UPDATED; - + /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(localtdata->tdata->lock); @@ -836,11 +836,11 @@ void *handleLocalReq(void *threadarg) { free(localtdata->transinfo->objnotfound); localtdata->transinfo->objnotfound = NULL; } - + pthread_exit(NULL); } /* This function completes the ABORT process if the transaction is aborting - */ +*/ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) { char *ptr; int i; @@ -871,44 +871,44 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int } /*This function completes the COMMIT process is the transaction is commiting - */ - int transComProcess(trans_commit_data_t *transinfo) { - objheader_t *header; - int i = 0, offset = 0; - char control; - - printf("DEBUG -> Recv TRANS_COMMIT\n"); - /* Process each modified object saved in the mainobject store */ - for(i=0; inummod; i++) { - if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - } - /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 1; //TODO Not sure what would be the val - - /* Change ptr address in mhash table */ - mhashRemove(transinfo->objmod[i]); - mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); - offset += sizeof(objheader_t) + classsize[header->type]; - - /* Update object version number */ - header = (objheader_t *) mhashSearch(transinfo->objmod[i]); - header->version += 1; - } - - /* Unlock locked objects */ - for(i=0; inumlocked; i++) { - header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); - header->status &= ~(LOCK); - } - - //TODO Update location lookup table - //TODO/* Unset the bit for local objects */ - - /* Send ack to Coordinator */ - printf("DEBUG-> TRANS_SUCESSFUL\n"); - return 0; - } +*/ +int transComProcess(trans_commit_data_t *transinfo) { + objheader_t *header; + int i = 0, offset = 0; + char control; + + printf("DEBUG -> Recv TRANS_COMMIT\n"); + /* Process each modified object saved in the mainobject store */ + for(i=0; inummod; i++) { + if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + } + /* Change reference count of older address and free space in objstr ?? */ + header->rcount = 1; //TODO Not sure what would be the val + + /* Change ptr address in mhash table */ + mhashRemove(transinfo->objmod[i]); + mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); + offset += sizeof(objheader_t) + classsize[header->type]; + + /* Update object version number */ + header = (objheader_t *) mhashSearch(transinfo->objmod[i]); + header->version += 1; + } + + /* Unlock locked objects */ + for(i=0; inumlocked; i++) { + header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); + header->status &= ~(LOCK); + } + + //TODO Update location lookup table + //TODO/* Unset the bit for local objects */ + + /* Send ack to Coordinator */ + printf("DEBUG-> TRANS_SUCESSFUL\n"); + return 0; +} /* This function checks if the prefetch oids are same and have same offsets * for case x.a.b and y.a.b where x and y have same oid's @@ -923,7 +923,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) { short *endoffsets, *arryfields; /* Check for the case x.y.z and a.b.c are same oids */ - ptr = (char *) node; + ptr = (char *) node; ntuples = *(GET_NTUPLES(ptr)); oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); @@ -1057,7 +1057,7 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { prefetchpile_t *head = NULL; /* Check for the case x.y.z and a.b.c are same oids */ - ptr = (char *) node; + ptr = (char *) node; ntuples = *(GET_NTUPLES(ptr)); oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); @@ -1093,7 +1093,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { short *endoffsets, *arryfields; prefetchpile_t *head = NULL; - ptr = (char *) node; + ptr = (char *) node; ntuples = *(GET_NTUPLES(ptr)); oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); @@ -1134,7 +1134,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { } else flag = 0; } - + /*If all offset oids are found locally,make the prefetch tuple invalid */ if(flag == 0) { oid[i] = -1; @@ -1177,7 +1177,7 @@ void *transPrefetch(void *t) { /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ pilehead = foundLocal(qnode); - + /* Lock mutex of pool queue */ pthread_mutex_lock(&mcqueue.qlock); /* Update the pool queue with the new remote machine piles generated per prefetch call */ @@ -1213,12 +1213,10 @@ void *mcqProcess(void *threadid) { pthread_mutex_unlock(&mcqueue.qlock); /*Initiate connection to remote host and send request */ - sendPrefetchReq(mcpilenode, tid); /* Process Request */ - - + sendPrefetchReq(mcpilenode, tid); /* TODO: For each object not found query DHT for new location and retrieve the object */ - + /* Deallocate the dequeued node */ } } @@ -1244,7 +1242,7 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo pthread_t thread[numoids]; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - + /* Create Machine Piles to send prefetch requests use threads*/ for( i = 0 ; i< numoids ; i++) { if(arrayofoffset[i][0] == -1) @@ -1271,13 +1269,13 @@ int transPrefetchProcess(transrecord_t *record, int *arrayofoffset[], short numo } } pthread_attr_destroy(&attr); - + return 0; - + } void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { - int sd, i, offset, off, len, endpair; + int sd, i, offset, off, len, endpair, numoffsets, count = 0; struct sockaddr_in serv_addr; struct hostent *server; char machineip[16], control; @@ -1314,6 +1312,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { tmp = mcpilenode->objpiles; while(tmp != NULL) { off = offset = 0; + count++; // Keeps track of the number of oid and offset tuples sent per remote machine len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); char oidnoffset[len]; memcpy(oidnoffset, &len, sizeof(int)); @@ -1331,6 +1330,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { tmp = tmp->next; } + /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ endpair = -1; if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) { perror("Error sending fixed bytes for thread\n"); @@ -1338,28 +1338,78 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { } /* Get Response from the remote machine */ - getPrefetchResponse(); - -// close(sd); + getPrefetchResponse(count,sd); + close(sd); } -void getPrefetchResponse() { - int i; - - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); - - /*TODO For each object found add to Prefetch Cache */ - - /* Broadcast signal on prefetch cache condition variable */ - pthread_cond_broadcast(&pflookup.qcond); - - /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); - - - - +void getPrefetchResponse(int count, int sd) { + int i = 0, val, n, N, sum, index, objsize; + unsigned int bufsize,oid; + char buffer[RECEIVE_BUFFER_SIZE], control; + char *ptr; + void *modptr; + /* Read prefetch response from the Remote machine */ + if((val = read(sd, &control, sizeof(char))) <= 0) { + perror("No control response for Prefetch request sent\n"); + return; + } + if(control == TRANS_PREFETCH_RESPONSE) { + /*For each oid and offset tuple sent as prefetch request to remote machine*/ + while(i < count) { + /* Clear contents of buffer */ + memset(buffer, 0, RECEIVE_BUFFER_SIZE); + sum = 0; + index = 0; + /* Read the size of buffer to be received */ + if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) { + perror("Size of buffer not recv\n"); + return; + } + memcpy(&bufsize, buffer, sizeof(unsigned int)); + ptr = buffer + sizeof(unsigned int); + /* Keep receiving the buffer containing oid info */ + do { + n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0); + sum +=n; + } while(sum < bufsize && n != 0); + /* Decode the contents of the buffer */ + index = sizeof(unsigned int); + while(index < (bufsize - sizeof(unsigned int))) { + if(buffer[index] == OBJECT_FOUND) { + /* Increment it to get the object */ + index += sizeof(char); + memcpy(&oid, buffer + index, sizeof(unsigned int)); + index += sizeof(unsigned int); + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); + /* For each object found add to Prefetch Cache */ + memcpy(&objsize, buffer + index, sizeof(int)); + if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { + printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + return; + } + memcpy(modptr, buffer+index, objsize); + index += sizeof(int); + /* Add pointer and oid to hash table */ + //TODO Do we need a version comparison herei ?? + prehashInsert(oid, modptr); + /* Broadcast signal on prefetch cache condition variable */ + pthread_cond_broadcast(&pflookup.cond); + /* Unlock the Prefetch Cache look up table*/ + pthread_mutex_unlock(&pflookup.lock); + } else if(buffer[index] == OBJECT_NOT_FOUND) { + /* Increment it to get the object */ + // TODO If object not found, local machine takes inventory + index += sizeof(char); + memcpy(&oid, buffer + index, sizeof(unsigned int)); + index += sizeof(unsigned int); + } else + printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__); + } + i++; + } + } else + printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__); } -- 2.34.1