/* Read control message from Participant */
recv_data(sd, &control, sizeof(char));
+ /* Recv Objects if participant sends TRANS_DISAGREE */
+ if(control == TRANS_DISAGREE) {
+ int length;
+ recv_data(sd, &length, sizeof(int));
+ void *newAddr;
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ close(sd);
+ pthread_exit(NULL);
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, newAddr, length);
+ int offset = 0;
+ while(length != 0) {
+ unsigned int oidToPrefetch;
+ objheader_t * header;
+ header = (objheader_t *) (((char *)newAddr) + offset);
+ oidToPrefetch = OID(header);
+ int size = 0;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ //make an entry in prefetch hash table
+ prehashInsert(oidToPrefetch, header);
+ length = length - size;
+ offset += size;
+ }
+ }
recvcontrol = control;
/* Update common data structure and increment count */
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
}
pthread_mutex_unlock(tdata->lock);
- /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
+ /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 0;
/* clear objects from prefetch cache */
- for (i = 0; i < tdata->buffer->f.numread; i++)
+ for (i = 0; i < tdata->buffer->f.numread; i++) {
prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
- for (i = 0; i < tdata->buffer->f.nummod; i++)
+ }
+ for (i = 0; i < tdata->buffer->f.nummod; i++) {
prehashRemove(tdata->buffer->oidmod[i]);
+ }
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
+ /* update prefetch cache */
+ /* For objects read */
+ char oidType;
+ int retval;
+ oidType = 'R';
+ if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ oidType = 'M';
+ if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 1;
}
-
return;
}
+/* This function updates the prefetch cache when commiting objects
+ * based on the type of oid i.e. if oid is read or oid is modified
+ * Return -1 on error else returns 0
+ */
+int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
+ int i;
+ for (i = 0; i < numoid; i++) {
+ //find address object
+ objheader_t *header, *newAddr;
+ int size;
+ unsigned int oid;
+ if(oidType == 'R') {
+ oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ } else {
+ oid = tdata->buffer->oidmod[i];
+ }
+ header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
+ //copy object into prefetch cache
+ GETSIZE(size, header);
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(newAddr, header, (size + sizeof(objheader_t)));
+ //make an entry in prefetch hash table
+ prehashInsert(oid, newAddr);
+ }
+ return 0;
+}
+
/* This function sends the final response to remote machines per
* thread in their respective socket id It returns a char that is only
* needed to check the correctness of execution of this function
recv_data(sd, &size, sizeof(int));
objcopy = objstrAlloc(record->cache, size);
recv_data(sd, objcopy, size);
-
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, oid, objcopy);
}
void *handleLocalReq(void *threadarg) {
unsigned int *oidnotfound = NULL, *oidlocked = NULL;
local_thread_data_array_t *localtdata;
- int objnotfound = 0, objlocked = 0;
+ int numoidnotfound = 0, numoidlocked = 0;
int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
int numread, i;
unsigned int oid;
/* Save the oids not found and number of oids not found for later use */
if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = oid;
- objnotfound++;
+ oidnotfound[numoidnotfound] = oid;
+ numoidnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
if (test_and_set(STATUSPTR(mobj))) {
} else {
//we're locked
/* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
+ oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
+ numoidlocked++;
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
v_matchnolock++;
} else { /* If versions don't match ...HARD ABORT */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
}
/* Condition to send TRANS_SOFT_ABORT */
- if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+ if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
}
localtdata->transinfo->objlocked = oidlocked;
localtdata->transinfo->objnotfound = oidnotfound;
localtdata->transinfo->modptr = NULL;
- localtdata->transinfo->numlocked = objlocked;
- localtdata->transinfo->numnotfound = objnotfound;
+ localtdata->transinfo->numlocked = numoidlocked;
+ localtdata->transinfo->numnotfound = numoidnotfound;
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
}
UnLock(STATUSPTR(header));
}
-
+
return 0;
}