+ int sd, i, n;
+ struct sockaddr_in serv_addr;
+ thread_data_array_t *tdata;
+ objheader_t *headeraddr;
+ char control, recvcontrol;
+ char machineip[16], retval;
+
+ tdata = (thread_data_array_t *) threadarg;
+
+ /* Send Trans Request */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket for TRANS_REQUEST\n");
+ pthread_exit(NULL);
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ serv_addr.sin_addr.s_addr = htonl(tdata->mid);
+
+ /* Open Connection */
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+ perror("Error in connect for TRANS_REQUEST\n");
+ close(sd);
+ pthread_exit(NULL);
+ }
+
+ /* Send bytes of data with TRANS_REQUEST control message */
+ send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
+
+ /* Send list of machines involved in the transaction */
+ {
+ int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
+ send_data(sd, tdata->buffer->listmid, size);
+ }
+
+ /* Send oids and version number tuples for objects that are read */
+ {
+ int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
+ send_data(sd, tdata->buffer->objread, size);
+ }
+
+ /* Send objects that are modified */
+ for(i = 0; i < tdata->buffer->f.nummod ; i++) {
+ int size;
+ headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ GETSIZE(size,headeraddr);
+ size+=sizeof(objheader_t);
+ send_data(sd, headeraddr, size);
+ }
+
+ /* Read control message from Participant */
+ recv_data(sd, &control, sizeof(char));
+ /* Recv Objects if participant sends TRANS_DISAGREE */
+ if(control == TRANS_DISAGREE) {
+ int length;
+ recv_data(sd, &length, sizeof(int));
+ void *newAddr;
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ close(sd);
+ pthread_exit(NULL);
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, newAddr, length);
+ int offset = 0;
+ while(length != 0) {
+ unsigned int oidToPrefetch;
+ objheader_t * header;
+ header = (objheader_t *) (((char *)newAddr) + offset);
+ oidToPrefetch = OID(header);
+ int size = 0;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ //make an entry in prefetch hash table
+ void *oldptr;
+ if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+ prehashRemove(oidToPrefetch);
+ prehashInsert(oidToPrefetch, header);
+ } else {
+ prehashInsert(oidToPrefetch, header);
+ }
+ length = length - size;
+ offset += size;
+ }
+ }
+ recvcontrol = control;
+ /* Update common data structure and increment count */
+ tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
+
+ /* Lock and update count */
+ /* Thread sleeps until all messages from pariticipants are received by coordinator */
+ pthread_mutex_lock(tdata->lock);
+
+ (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
+
+ /* Wake up the threads and invoke decideResponse (once) */
+ if(*(tdata->count) == tdata->buffer->f.mcount) {
+ decideResponse(tdata);
+ pthread_cond_broadcast(tdata->threshold);
+ } else {
+ pthread_cond_wait(tdata->threshold, tdata->lock);
+ }
+ pthread_mutex_unlock(tdata->lock);
+
+ /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
+ * to all participants in their respective socket */
+ if (sendResponse(tdata, sd) == 0) {
+ printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
+ close(sd);
+ pthread_exit(NULL);
+ }
+
+ recv_data((int)sd, &control, sizeof(char));
+
+ if(control == TRANS_UNSUCESSFUL) {
+ //printf("DEBUG-> TRANS_ABORTED\n");
+ } else if(control == TRANS_SUCESSFUL) {
+ //printf("DEBUG-> TRANS_SUCCESSFUL\n");
+ } else {
+ //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
+ }
+
+ /* Close connection */
+ close(sd);
+ pthread_exit(NULL);
+}
+
+/* This function decides the reponse that needs to be sent to
+ * all Participant machines after the TRANS_REQUEST protocol */
+void decideResponse(thread_data_array_t *tdata) {
+ char control;
+ int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
+ message to send */
+
+ for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
+ control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
+ written onto the shared array */
+ switch(control) {
+ default:
+ printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+ /* treat as disagree, pass thru */
+ case TRANS_DISAGREE:
+ transdisagree++;
+ break;
+
+ case TRANS_AGREE:
+ transagree++;
+ break;
+
+ case TRANS_SOFT_ABORT:
+ transsoftabort++;
+ break;
+ }
+ }
+
+ if(transdisagree > 0) {
+ /* Send Abort */
+ *(tdata->replyctrl) = TRANS_ABORT;
+ *(tdata->replyretry) = 0;
+ /* clear objects from prefetch cache */
+ for (i = 0; i < tdata->buffer->f.numread; i++) {
+ prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
+ }
+ for (i = 0; i < tdata->buffer->f.nummod; i++) {
+ prehashRemove(tdata->buffer->oidmod[i]);
+ }
+ } else if(transagree == tdata->buffer->f.mcount){
+ /* Send Commit */
+ *(tdata->replyctrl) = TRANS_COMMIT;
+ *(tdata->replyretry) = 0;
+ /* update prefetch cache */
+ /* For objects read */
+ char oidType;
+ int retval;
+ oidType = 'R';
+ if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ oidType = 'M';
+ if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ /* Invalidate objects in other machine cache */
+ if(tdata->buffer->f.nummod > 0) {
+ if((retval = invalidateObj(tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ }
+ } else {
+ /* Send Abort in soft abort case followed by retry commiting transaction again*/
+ *(tdata->replyctrl) = TRANS_ABORT;
+ *(tdata->replyretry) = 1;
+ }
+ return;
+}
+
+/* This function updates the prefetch cache when commiting objects
+ * based on the type of oid i.e. if oid is read or oid is modified
+ * Return -1 on error else returns 0
+ */
+int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
+ int i;
+ for (i = 0; i < numoid; i++) {
+ //find address object
+ objheader_t *header, *newAddr;
+ int size;
+ unsigned int oid;
+ if(oidType == 'R') {
+ oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ } else {
+ oid = tdata->buffer->oidmod[i];
+ }
+ pthread_mutex_lock(&prefetchcache_mutex);
+ header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
+ //copy object into prefetch cache
+ GETSIZE(size, header);
+ if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(newAddr, header, (size + sizeof(objheader_t)));
+ //make an entry in prefetch hash table
+ void *oldptr;
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ prehashInsert(oid, newAddr);
+ } else {
+ prehashInsert(oid, newAddr);
+ }
+ }
+ return 0;
+}
+
+
+/* This function sends the final response to remote machines per
+ * thread in their respective socket id It returns a char that is only
+ * needed to check the correctness of execution of this function
+ * inside transRequest()*/
+
+char sendResponse(thread_data_array_t *tdata, int sd) {
+ int n, size, sum, oidcount = 0, control;
+ char *ptr, retval = 0;
+ unsigned int *oidnotfound;
+
+ control = *(tdata->replyctrl);
+ send_data(sd, &control, sizeof(char));
+
+ //TODO read missing objects during object migration
+ /* If response is a soft abort due to missing objects at the
+ Participant's side */
+
+ /* If the decided response is TRANS_ABORT */
+ if(*(tdata->replyctrl) == TRANS_ABORT) {
+ retval = TRANS_ABORT;
+ } else if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ /* If the decided response is TRANS_COMMIT */
+ retval = TRANS_COMMIT;
+ }
+
+ return retval;
+}
+
+/* This function opens a connection, places an object read request to
+ * the remote machine, reads the control message and object if
+ * available and copies the object and its header to the local
+ * cache. */
+
+void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
+ int size, val;
+ struct sockaddr_in serv_addr;
+ char machineip[16];
+ char control;
+ objheader_t *h;
+ void *objcopy = NULL;
+
+ int sd = getSock2(transReadSockPool, mnum);
+ char readrequest[sizeof(char)+sizeof(unsigned int)];
+ readrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&readrequest[1])) = oid;
+ send_data(sd, readrequest, sizeof(readrequest));
+
+ /* Read response from the Participant */
+ recv_data(sd, &control, sizeof(char));
+
+ if (control==OBJECT_NOT_FOUND) {
+ objcopy = NULL;
+ } else {
+ /* Read object if found into local cache */
+ recv_data(sd, &size, sizeof(int));
+ objcopy = objstrAlloc(record->cache, size);
+ recv_data(sd, objcopy, size);
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, oid, objcopy);
+ }
+
+ return objcopy;
+}
+
+/* This function handles the local objects involved in a transaction
+ * commiting process. It also makes a decision if this local machine
+ * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note
+ * Coordinator = local machine It wakes up the other threads from
+ * remote participants that are waiting for the coordinator's decision
+ * and based on common agreement it either commits or aborts the
+ * transaction. It also frees the memory resources */
+
+void *handleLocalReq(void *threadarg) {
+ unsigned int *oidnotfound = NULL, *oidlocked = NULL;
+ local_thread_data_array_t *localtdata;
+ int numoidnotfound = 0, numoidlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+ int numread, i;
+ unsigned int oid;
+ unsigned short version;
+ void *mobj;
+ objheader_t *headptr;
+
+ localtdata = (local_thread_data_array_t *) threadarg;
+
+ /* Counters and arrays to formulate decision on control message to be sent */
+ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+
+ numread = localtdata->tdata->buffer->f.numread;
+ /* Process each oid in the machine pile/ group per thread */
+ for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
+ if (i < localtdata->tdata->buffer->f.numread) {
+ int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
+ version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
+ } else { // Objects Modified
+ int tmpsize;
+ headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
+ if (headptr == NULL) {
+ printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ oid = OID(headptr);
+ version = headptr->version;
+ }
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[numoidnotfound] = oid;
+ numoidnotfound++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (test_and_set(STATUSPTR(mobj))) {
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ v_matchlock++;
+ } else {/* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ }
+ } else {
+ //we're locked
+ /* Save all object oids that are locked on this machine during this transaction request call */
+ oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
+ numoidlocked++;
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ v_matchnolock++;
+ } else { /* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ }
+ }
+ }
+ } // End for
+ /* Condition to send TRANS_AGREE */
+ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
+ }
+ /* Condition to send TRANS_SOFT_ABORT */
+ if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
+ }
+
+ /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+ * if Participant receives a TRANS_COMMIT */
+ localtdata->transinfo->objlocked = oidlocked;
+ localtdata->transinfo->objnotfound = oidnotfound;
+ localtdata->transinfo->modptr = NULL;
+ localtdata->transinfo->numlocked = numoidlocked;
+ localtdata->transinfo->numnotfound = numoidnotfound;
+ /* Lock and update count */
+ //Thread sleeps until all messages from pariticipants are received by coordinator
+ pthread_mutex_lock(localtdata->tdata->lock);
+ (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
+
+ /* Wake up the threads and invoke decideResponse (once) */
+ if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
+ decideResponse(localtdata->tdata);
+ pthread_cond_broadcast(localtdata->tdata->threshold);
+ } else {
+ pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
+ }
+ pthread_mutex_unlock(localtdata->tdata->lock);
+ if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
+ if(transAbortProcess(localtdata) != 0) {
+ printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+ if(transComProcess(localtdata) != 0) {
+ printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ }
+ /* Free memory */
+ if (localtdata->transinfo->objlocked != NULL) {
+ free(localtdata->transinfo->objlocked);
+ }
+ if (localtdata->transinfo->objnotfound != NULL) {
+ free(localtdata->transinfo->objnotfound);
+ }
+
+ pthread_exit(NULL);
+}
+
+/* This function completes the ABORT process if the transaction is aborting */
+int transAbortProcess(local_thread_data_array_t *localtdata) {
+ int i, numlocked;
+ unsigned int *objlocked;
+ void *header;
+
+ numlocked = localtdata->transinfo->numlocked;
+ objlocked = localtdata->transinfo->objlocked;
+
+ for (i = 0; i < numlocked; i++) {
+ if((header = mhashSearch(objlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ UnLock(STATUSPTR(header));
+ }
+
+ return 0;
+}
+
+/*This function completes the COMMIT process is the transaction is commiting*/
+int transComProcess(local_thread_data_array_t *localtdata) {
+ objheader_t *header, *tcptr;
+ int i, nummod, tmpsize, numcreated, numlocked;
+ unsigned int *oidmod, *oidcreated, *oidlocked;
+ void *ptrcreate;
+
+ nummod = localtdata->tdata->buffer->f.nummod;
+ oidmod = localtdata->tdata->buffer->oidmod;
+ numcreated = localtdata->tdata->buffer->f.numcreated;
+ oidcreated = localtdata->tdata->buffer->oidcreated;
+ numlocked = localtdata->transinfo->numlocked;
+ oidlocked = localtdata->transinfo->objlocked;
+
+ for (i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ /* Copy from transaction cache -> main object store */
+ if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize, header);
+ char *tmptcptr = (char *) tcptr;
+ memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+ header->version += 1;
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ }
+ /* If object is newly created inside transaction then commit it */
+ for (i = 0; i < numcreated; i++) {
+ if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize, header);
+ tmpsize += sizeof(objheader_t);
+ pthread_mutex_lock(&mainobjstore_mutex);
+ if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+ printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ memcpy(ptrcreate, header, tmpsize);
+ mhashInsert(oidcreated[i], ptrcreate);
+ lhashInsert(oidcreated[i], myIpAddr);
+ }
+ /* Unlock locked objects */
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ UnLock(STATUSPTR(header));
+ }
+
+ return 0;
+}
+
+prefetchpile_t *foundLocal(char *ptr) {
+ int ntuples = *(GET_NTUPLES(ptr));
+ unsigned int * oidarray = GET_PTR_OID(ptr);
+ unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ prefetchpile_t * head=NULL;
+
+ int i;
+ for(i=0;i<ntuples; i++) {
+ unsigned short baseindex=(i==0)?0:endoffsets[i-1];
+ unsigned short endindex=endoffsets[i];
+ unsigned int oid=oidarray[i];
+ int newbase;
+ int machinenum;
+ if (oid==0)
+ continue;
+ //Look up fields locally
+ for(newbase=baseindex;newbase<endindex;newbase++) {
+ if (!lookupObject(&oid, arryfields[newbase]))
+ break;
+ //Ended in a null pointer...
+ if (oid==0)
+ goto tuple;
+ }
+ //Entire prefetch is local
+ if (newbase==endindex&&checkoid(oid))
+ goto tuple;
+ //Add to remote requests
+ machinenum=lhashSearch(oid);
+ insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
+ tuple:
+ ;
+ }
+ return head;
+}
+
+int checkoid(unsigned int oid) {
+ objheader_t *header;
+ if ((header=mhashSearch(oid))!=NULL) {
+ //Found on machine
+ return 1;
+ } else if ((header=prehashSearch(oid))!=NULL) {
+ //Found in cache
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+int lookupObject(unsigned int * oid, short offset) {
+ objheader_t *header;
+ if ((header=mhashSearch(*oid))!=NULL) {
+ //Found on machine
+ ;
+ } else if ((header=prehashSearch(*oid))!=NULL) {
+ //Found in cache
+ ;
+ } else {
+ return 0;
+ }
+
+ if(TYPE(header) > NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ int length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offset < 0 || offset >= length) {
+ //if yes treat the object as found
+ (*oid)=0;
+ return 1;
+ }
+ (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
+ return 1;
+ } else {
+ (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
+ return 1;
+ }
+}
+
+
+/* This function is called by the thread calling transPrefetch */
+void *transPrefetch(void *t) {
+ while(1) {
+ /* lock mutex of primary prefetch queue */
+ void *node=gettail();
+ /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* and group requests by remote machine ids by calling the makePreGroups() */
+ prefetchpile_t *pilehead = foundLocal(node);
+
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+ int sd = getSock2(transPrefetchSockPool, pilehead->mid);
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ /* Release socket */
+ // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+
+ /* Deallocated pilehead */
+ mcdealloc(pilehead);
+ }
+ // Deallocate the prefetch queue pile node
+ inctail();
+ }
+}
+
+void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
+ objpile_t *tmp;
+
+ int size=sizeof(char)+sizeof(int);
+ for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
+ size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ }
+
+ char buft[size];
+ char *buf=buft;
+ *buf=TRANS_PREFETCH;
+ buf+=sizeof(char);
+
+ for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
+ int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ *((int*)buf)=len;
+ buf+=sizeof(int);
+ *((unsigned int *)buf)=tmp->oid;
+ buf+=sizeof(unsigned int);
+ *((unsigned int *)(buf)) = myIpAddr;
+ buf+=sizeof(unsigned int);
+ memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
+ buf+=tmp->numoffset*sizeof(short);
+ }
+ *((int *)buf)=-1;
+ send_data(sd, buft, size);
+ return;
+}
+
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
+ int len, endpair;
+ char control;
+ objpile_t *tmp;
+
+ /* Send TRANS_PREFETCH control message */
+ control = TRANS_PREFETCH;
+ send_data(sd, &control, sizeof(char));
+
+ /* Send Oids and offsets in pairs */
+ tmp = mcpilenode->objpiles;
+ while(tmp != NULL) {
+ len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ char oidnoffset[len];
+ char *buf=oidnoffset;
+ *((int*)buf) = tmp->numoffset;
+ buf+=sizeof(int);
+ *((unsigned int *)buf) = tmp->oid;
+ buf+=sizeof(unsigned int);
+ *((unsigned int *)buf) = myIpAddr;
+ buf += sizeof(unsigned int);
+ memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
+ send_data(sd, oidnoffset, len);
+ tmp = tmp->next;
+ }
+
+ /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
+ endpair = -1;
+ send_data(sd, &endpair, sizeof(int));
+
+ return;
+}
+
+int getPrefetchResponse(int sd) {
+ int length = 0, size = 0;
+ char control;
+ unsigned int oid;
+ void *modptr, *oldptr;
+
+ recv_data((int)sd, &length, sizeof(int));
+ size = length - sizeof(int);
+ char recvbuffer[size];
+
+ recv_data((int)sd, recvbuffer, size);
+ control = *((char *) recvbuffer);
+ if(control == OBJECT_FOUND) {
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ //printf("oid %d found\n",oid);
+ size = size - (sizeof(char) + sizeof(unsigned int));
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+ printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return -1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+ STATUS(modptr)=0;
+
+ /* Insert the oid and its address into the prefetch hash lookup table */
+ /* Do a version comparison if the oid exists */
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ /* If older version then update with new object ptr */
+ if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+ prehashRemove(oid);
+ prehashInsert(oid, modptr);
+ }
+ } else {/* Else add the object ptr to hash table*/
+ prehashInsert(oid, modptr);
+ }
+ /* Lock the Prefetch Cache look up table*/
+ pthread_mutex_lock(&pflookup.lock);
+ /* Broadcast signal on prefetch cache condition variable */
+ pthread_cond_broadcast(&pflookup.cond);
+ /* Unlock the Prefetch Cache look up table*/
+ pthread_mutex_unlock(&pflookup.lock);
+ } else if(control == OBJECT_NOT_FOUND) {
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ /* TODO: For each object not found query DHT for new location and retrieve the object */
+ /* Throw an error */
+ printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+ // exit(-1);
+ } else {
+ printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
+ }
+
+ return 0;
+}
+
+unsigned short getObjType(unsigned int oid) {
+ objheader_t *objheader;
+ unsigned short numoffset[] ={0};
+ short fieldoffset[] ={};
+
+ if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
+ if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+ prefetch(1, &oid, numoffset, fieldoffset);
+ pthread_mutex_lock(&pflookup.lock);
+ while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+ pthread_cond_wait(&pflookup.cond, &pflookup.lock);