From: bdemsky Date: Fri, 11 Apr 2008 08:54:06 +0000 (+0000) Subject: fix bugs in sockpool...test and set has to be atomic X-Git-Tag: preEdgeChange~176 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=29f92e6d03e67063cdde089734faf41cfdd410fc;p=IRC.git fix bugs in sockpool...test and set has to be atomic --- diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 1f892e35..b17319b9 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -41,7 +41,7 @@ int dstmInit(void) return 1; //failure //Initialize socket pool - if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) { + if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1)) == NULL) { printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); return 0; } @@ -606,157 +606,143 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * then use offset values to prefetch references to other objects */ int prefetchReq(int acceptfd) { - int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0; - int length; - char *recvbuffer, *sendbuffer, control; - unsigned int oid, mid; - objheader_t *header; - struct sockaddr_in remoteAddr; - oidmidpair_t oidmid; - - do { - recv_data((int)acceptfd, &length, sizeof(int)); - if(length != -1) { - recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); - oid = oidmid.oid; - mid = oidmid.mid; - size = length - sizeof(int) - (2 * sizeof(unsigned int)); - numoffset = size/sizeof(short); - short offsetarry[numoffset]; - recv_data((int) acceptfd, offsetarry, size); - - int sd = -1; - if((sd = getSock(transPResponseSocketPool, mid)) == -1) { - printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__); - exit(-1); - } - - /*Process each oid */ - if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found in buffer for later use */ - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - close(sd); - return -1; - } - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - } else { /* Object Found */ - int incr = 0; - GETSIZE(objsize, header); - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - close(sd); - return -1; - } - *((int *) (sendbuffer + incr)) = size; - incr += sizeof(int); - *((char *)(sendbuffer + incr)) = OBJECT_FOUND; - incr += sizeof(char); - *((unsigned int *)(sendbuffer+incr)) = oid; - incr += sizeof(unsigned int); - memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - - /* Calculate the oid corresponding to the offset value */ - for(i = 0 ; i< numoffset ; i++) { - /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { - isArray = 1; - } - if(isArray == 1) { - int elementsize = classsize[TYPE(header)]; - struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); - unsigned short length = ao->___length___; - /* Check if array out of bounds */ - if(offsetarry[i]< 0 || offsetarry[i] >= length) { - break; - } - oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i]))); - } else { - oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i])); - } - - if((header = mhashSearch(oid)) == NULL) { - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - close(sd); - return -1; - } - *((int *) sendbuffer) = size; - *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; - *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __FILE__, __LINE__); - close(sd); - return -1; - } - break; - } else {/* Obj Found */ - int incr = 0; - GETSIZE(objsize, header); - size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; - if((sendbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - *((int *) (sendbuffer + incr)) = size; - incr += sizeof(int); - *((char *)(sendbuffer + incr)) = OBJECT_FOUND; - incr += sizeof(char); - *((unsigned int *)(sendbuffer+incr)) = oid; - incr += sizeof(unsigned int); - memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); - - control = TRANS_PREFETCH_RESPONSE; - if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - printf("Error: %s() in sending prefetch response at %s, %d\n", - __func__, __FILE__, __LINE__); - close(sd); - return -1; - } - } - isArray = 0; - } - } - - //Release socket - int status; - if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) { - printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__); - return -1; - } - } - } while (length != -1); - return 0; + int i, size, objsize, numoffset = 0; + int length; + char *recvbuffer, *sendbuffer, control; + unsigned int oid, mid=-1; + objheader_t *header; + oidmidpair_t oidmid; + int sd = -1; + + while(1) { + recv_data((int)acceptfd, &length, sizeof(int)); + if(length == -1) + break; + recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); + oid = oidmid.oid; + if (mid != oidmid.mid) { + if (mid!=-1) { + freeSockWithLock(transPResponseSocketPool, mid, sd); + } + mid=oidmid.mid; + if((sd = getSockWithLock(transPResponseSocketPool, mid)) == -1) { + printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__); + exit(-1); + } + } + size = length - sizeof(int) - (2 * sizeof(unsigned int)); + numoffset = size/sizeof(short); + short offsetarry[numoffset]; + recv_data((int) acceptfd, offsetarry, size); + + /*Process each oid */ + if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found in buffer for later use */ + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + sendbuffer = calloc(1, size); + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + control = TRANS_PREFETCH_RESPONSE; + + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + printf("Error: %s() in sending prefetch response at %s, %d\n", + __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + } else { /* Object Found */ + int incr = 0; + GETSIZE(objsize, header); + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + sendbuffer = calloc(1, size); + *((int *) (sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer+incr)) = oid; + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + printf("Error: %s() in sending prefetch response at %s, %d\n", + __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + + /* Calculate the oid corresponding to the offset value */ + for(i = 0 ; i< numoffset ; i++) { + /* Check for arrays */ + if(TYPE(header) > NUMCLASSES) { + int elementsize = classsize[TYPE(header)]; + struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); + unsigned short length = ao->___length___; + /* Check if array out of bounds */ + if(offsetarry[i]< 0 || offsetarry[i] >= length) { + break; + } + oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i]))); + } else { + oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i])); + } + + if((header = mhashSearch(oid)) == NULL) { + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; + if((sendbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __FILE__, __LINE__); + close(sd); + return -1; + } + *((int *) sendbuffer) = size; + *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND; + *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid; + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + printf("Error: %s() in sending prefetch response at %s, %d\n", + __FILE__, __LINE__); + close(sd); + return -1; + } + break; + } else {/* Obj Found */ + int incr = 0; + GETSIZE(objsize, header); + size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; + if((sendbuffer = calloc(1, size)) == NULL) { + printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + *((int *) (sendbuffer + incr)) = size; + incr += sizeof(int); + *((char *)(sendbuffer + incr)) = OBJECT_FOUND; + incr += sizeof(char); + *((unsigned int *)(sendbuffer+incr)) = oid; + incr += sizeof(unsigned int); + memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t)); + + control = TRANS_PREFETCH_RESPONSE; + if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { + printf("Error: %s() in sending prefetch response at %s, %d\n", + __func__, __FILE__, __LINE__); + close(sd); + return -1; + } + } + } + } + } + //Release socket + if (mid!=-1) + freeSockWithLock(transPResponseSocketPool, mid, sd); + + return 0; } int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) { - int numbytes = 0; - send_data(sd, control, sizeof(char)); /* Send the buffer with its size */ int length = *(size); diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.c b/Robust/src/Runtime/DSTM/interface/sockpool.c index 78e7272d..64337b25 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface/sockpool.c @@ -23,7 +23,7 @@ inline static void UnLock(volatile unsigned int *addr) { #define MAXSPINS 1000 -inline void Lock(unsigned int *s) { +inline void Lock(volatile unsigned int *s) { while(test_and_set(s)) { int i=0; while(*s) { @@ -85,7 +85,7 @@ int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { Lock(&sockhash->mylock); ptr=&sockhash->table[key]; - while(ptr!=NULL) { + while(*ptr!=NULL) { if (mid == (*ptr)->mid) { socknode_t *tmp=*ptr; sd = tmp->sd; @@ -114,7 +114,7 @@ int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) { ptr=&sockhash->table[key]; - while(ptr!=NULL) { + while(*ptr!=NULL) { if (mid == (*ptr)->mid) { socknode_t *tmp=*ptr; sd = tmp->sd; @@ -129,14 +129,36 @@ int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) { socknode_t *inusenode = calloc(1, sizeof(socknode_t)); inusenode->next=sockhash->inuse; sockhash->inuse=inusenode; - inusenode->next=sockhash; - sockhash=inusenode; return sd; } else { return -1; } } +int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) { + socknode_t **ptr; + int key = mid%(sockhash->size); + int sd; + + ptr=&sockhash->table[key]; + + while(*ptr!=NULL) { + if (mid == (*ptr)->mid) { + return (*ptr)->sd; + } + ptr=&((*ptr)->next); + } + if((sd = createNewSocket(mid)) != -1) { + *ptr=calloc(1, sizeof(socknode_t)); + (*ptr)->mid=mid; + (*ptr)->sd=sd; + return sd; + } else { + return -1; + } +} + + void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) { Lock(&sockhash->mylock); inusenode->next = sockhash->inuse; diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.h b/Robust/src/Runtime/DSTM/interface/sockpool.h index 96d46840..bc1b4fab 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.h +++ b/Robust/src/Runtime/DSTM/interface/sockpool.h @@ -17,18 +17,15 @@ typedef struct sockPoolHashTable { volatile unsigned int mylock; } sockPoolHashTable_t; -sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int, float); +sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int); int getSock(sockPoolHashTable_t *, unsigned int); +int getSock2(sockPoolHashTable_t *, unsigned int); int getSockWithLock(sockPoolHashTable_t *, unsigned int); -int freeSock(sockPoolHashTable_t *, unsigned int, int); -int freeSockWithLock(sockPoolHashTable_t *, unsigned int, int); +void freeSock(sockPoolHashTable_t *, unsigned int, int); +void freeSockWithLock(sockPoolHashTable_t *, unsigned int, int); void insToList(sockPoolHashTable_t *, socknode_t *); void insToListWithLock(sockPoolHashTable_t *, socknode_t *); int createNewSocket(unsigned int); -int CompareAndSwap(int *, int, int); -void InitLock(SpinLock *); -void Lock (SpinLock *); -void UnLock (SpinLock *); #if 0 /************************************************ diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 99e63561..155a6222 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -127,7 +127,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short if(ntuples > 0) { int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); char * node; - + if((node = calloc(1, qnodesize)) == NULL) { printf("Calloc Error %s, %d\n", __FILE__, __LINE__); return; @@ -163,40 +163,33 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short /* This function starts up the transaction runtime. */ int dstmStartup(const char * option) { - pthread_t thread_Listen; - pthread_attr_t attr; - int master=option!=NULL && strcmp(option, "master")==0; - - if (processConfigFile() != 0) - return 0; //TODO: return error value, cause main program to exit + pthread_t thread_Listen; + pthread_attr_t attr; + int master=option!=NULL && strcmp(option, "master")==0; + + if (processConfigFile() != 0) + return 0; //TODO: return error value, cause main program to exit #ifdef COMPILER - if (!master) - threadcount--; + if (!master) + threadcount--; #endif - - //Initialize socket pool - if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) { - printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); - return 0; - } - if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) { - printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); - return 0; - } - - dstmInit(); - transInit(); - - if (master) { - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - pthread_create(&thread_Listen, &attr, dstmListen, NULL); - return 1; - } else { - dstmListen(); - return 0; - } - + + //Initialize socket pool + transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1); + transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1); + + dstmInit(); + transInit(); + + if (master) { + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + return 1; + } else { + dstmListen(); + return 0; + } } //TODO Use this later @@ -841,52 +834,37 @@ char sendResponse(thread_data_array_t *tdata, int sd) { * */ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { - int size, val; - struct sockaddr_in serv_addr; - char machineip[16]; - char control; - objheader_t *h; - void *objcopy = NULL; - - int sd; - if((sd = getSock(transReadSockPool, mnum)) == -1) { - printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__); - return NULL; - } + int size, val; + struct sockaddr_in serv_addr; + char machineip[16]; + char control; + objheader_t *h; + void *objcopy = NULL; + + int sd = getSock2(transReadSockPool, mnum); + char readrequest[sizeof(char)+sizeof(unsigned int)]; + readrequest[0] = READ_REQUEST; + *((unsigned int *)(&readrequest[1])) = oid; + send_data(sd, readrequest, sizeof(readrequest)); + + /* Read response from the Participant */ + recv_data(sd, &control, sizeof(char)); + + if (control==OBJECT_NOT_FOUND) { + objcopy = NULL; + } else { + /* Read object if found into local cache */ + recv_data(sd, &size, sizeof(int)); + objcopy = objstrAlloc(record->cache, size); + recv_data(sd, objcopy, size); - char readrequest[sizeof(char)+sizeof(unsigned int)]; - readrequest[0] = READ_REQUEST; - *((unsigned int *)(&readrequest[1])) = oid; - send_data(sd, readrequest, sizeof(readrequest)); - - /* Read response from the Participant */ - recv_data(sd, &control, sizeof(char)); - - switch(control) { - case OBJECT_NOT_FOUND: - objcopy = NULL; - break; - case OBJECT_FOUND: - /* Read object if found into local cache */ - recv_data(sd, &size, sizeof(int)); - objcopy = objstrAlloc(record->cache, size); - recv_data(sd, objcopy, size); - - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, oid, objcopy); - break; - default: - printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__); - break; - } - - int status; - if((status = freeSock(transReadSockPool, mnum, sd)) == -1) { - printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__); - return NULL; - } - - return objcopy; + /* Insert into cache's lookup table */ + chashInsert(record->lookupTable, oid, objcopy); + } + + // freeSock(transReadSockPool, mnum, sd); + + return objcopy; } /* This function handles the local objects involved in a transaction commiting process. @@ -1416,100 +1394,87 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *t) { - while(1) { - /* lock mutex of primary prefetch queue */ - pthread_mutex_lock(&pqueue.qlock); - /* while primary queue is empty, then wait */ - while((pqueue.front == NULL) && (pqueue.rear == NULL)) { - pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); - } - - /* dequeue node to create a machine piles and finally unlock mutex */ - prefetchqelem_t *qnode; - if((qnode = pre_dequeue()) == NULL) { - printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&pqueue.qlock); - continue; - } - pthread_mutex_unlock(&pqueue.qlock); - - /* Reduce redundant prefetch requests */ - checkPrefetchTuples(qnode); - /* Check if the tuples are found locally, if yes then reduce them further*/ - /* and group requests by remote machine ids by calling the makePreGroups() */ - prefetchpile_t *pilehead = NULL; - if((pilehead = foundLocal(qnode)) == NULL) { - printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__); - pre_enqueue(qnode); - continue; - } - - // Get sock from shared pool - int sd = -1; - if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) { - printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__); - exit(-1); - } - - /* Send Prefetch Request */ - prefetchpile_t *ptr = pilehead; - while(ptr != NULL) { - sendPrefetchReq(ptr, sd); - ptr = ptr->next; - } - - /* Release socket */ - int status; - if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) { - printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__); - return; - } - - /* Deallocated pilehead */ - mcdealloc(pilehead); - - // Deallocate the prefetch queue pile node - predealloc(qnode); - } + while(1) { + /* lock mutex of primary prefetch queue */ + pthread_mutex_lock(&pqueue.qlock); + /* while primary queue is empty, then wait */ + while((pqueue.front == NULL) && (pqueue.rear == NULL)) { + pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); + } + + /* dequeue node to create a machine piles and finally unlock mutex */ + prefetchqelem_t *qnode; + if((qnode = pre_dequeue()) == NULL) { + printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&pqueue.qlock); + continue; + } + pthread_mutex_unlock(&pqueue.qlock); + + /* Reduce redundant prefetch requests */ + checkPrefetchTuples(qnode); + /* Check if the tuples are found locally, if yes then reduce them further*/ + /* and group requests by remote machine ids by calling the makePreGroups() */ + prefetchpile_t *pilehead = foundLocal(qnode); + + // Get sock from shared pool + int sd = getSock2(transPrefetchSockPool, pilehead->mid); + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + /* Release socket */ + // freeSock(transPrefetchSockPool, pilehead->mid, sd); + + /* Deallocated pilehead */ + mcdealloc(pilehead); + + // Deallocate the prefetch queue pile node + predealloc(qnode); + } } void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { - int off, len, endpair, count = 0; - char control; - objpile_t *tmp; - - /* Send TRANS_PREFETCH control message */ - control = TRANS_PREFETCH; - send_data(sd, &control, sizeof(char)); - - /* Send Oids and offsets in pairs */ - tmp = mcpilenode->objpiles; - while(tmp != NULL) { - off = 0; - count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ - len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); - char oidnoffset[len]; - bzero(oidnoffset, len); - *((int*)oidnoffset) = len; - off = sizeof(int); - *((unsigned int *)(oidnoffset + off)) = tmp->oid; - off += sizeof(unsigned int); - *((unsigned int *)(oidnoffset + off)) = myIpAddr; - off += sizeof(unsigned int); - int i; - for(i = 0; i < tmp->numoffset; i++) { - *((short*)(oidnoffset + off)) = tmp->offset[i]; - off+=sizeof(short); - } - send_data(sd, oidnoffset, len); - tmp = tmp->next; - } - - /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ - endpair = -1; - send_data(sd, &endpair, sizeof(int)); - - return; + int off, len, endpair, count = 0; + char control; + objpile_t *tmp; + + /* Send TRANS_PREFETCH control message */ + control = TRANS_PREFETCH; + send_data(sd, &control, sizeof(char)); + + /* Send Oids and offsets in pairs */ + tmp = mcpilenode->objpiles; + while(tmp != NULL) { + off = 0; + count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ + len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + char oidnoffset[len]; + bzero(oidnoffset, len); + *((int*)oidnoffset) = len; + off = sizeof(int); + *((unsigned int *)(oidnoffset + off)) = tmp->oid; + off += sizeof(unsigned int); + *((unsigned int *)(oidnoffset + off)) = myIpAddr; + off += sizeof(unsigned int); + int i; + for(i = 0; i < tmp->numoffset; i++) { + *((short*)(oidnoffset + off)) = tmp->offset[i]; + off+=sizeof(short); + } + send_data(sd, oidnoffset, len); + tmp = tmp->next; + } + + /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ + endpair = -1; + send_data(sd, &endpair, sizeof(int)); + + return; } int getPrefetchResponse(int sd) {