#include "addUdpEnhance.h"
#include "addPrefetchEnhance.h"
#include "gCollect.h"
+#include "dsmlock.h"
#ifdef COMPILER
#include "thread.h"
#endif
int nmhashSearch = 0;
int nprehashSearch = 0;
int nRemoteSend = 0;
+int nSoftAbort = 0;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
t = time(NULL);
req.tv_sec = 0;
- req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+ req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
nanosleep(&req, NULL);
return;
}
free(thread_data_array);
free(ltdata);
randomdelay();
+#ifdef TRANSSTATS
+ nSoftAbort++;
+#endif
}
/* Retry trans commit procedure during soft_abort case */
pthread_mutex_lock(&prefetchcache_mutex);
if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
pthread_exit(NULL);
}
pthread_mutex_unlock(&prefetchcache_mutex);
int numread, i;
unsigned int oid;
unsigned short version;
- void *mobj;
- objheader_t *headptr;
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
+ //setting a divider of read locks
+ //and write locks
numread = localtdata->tdata->buffer->f.numread;
/* Process each oid in the machine pile/ group per thread */
incr *= i;
oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
+ commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
} else { // Objects Modified
+ if(i == localtdata->tdata->buffer->f.numread) {
+ oidlocked[numoidlocked] = -1;
+ numoidlocked++;
+ }
int tmpsize;
+ objheader_t *headptr;
headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
if (headptr == NULL) {
printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
}
oid = OID(headptr);
version = headptr->version;
+ commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
}
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ }
- /* Save the oids not found and number of oids not found for later use */
- if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[numoidnotfound] = oid;
- numoidnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if (test_and_set(STATUSPTR(mobj))) {
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- break;
- }
- } else {
- //we're locked
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
- numoidlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- break;
- }
- }
- }
- } // End for
- /* Condition to send TRANS_AGREE */
+ /* Condition to send TRANS_AGREE */
if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
}
localtdata->transinfo->modptr = NULL;
localtdata->transinfo->numlocked = numoidlocked;
localtdata->transinfo->numnotfound = numoidnotfound;
+
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
}
pthread_mutex_unlock(localtdata->tdata->lock);
+
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
free(localtdata->transinfo->objnotfound);
}
pthread_exit(NULL);
+
+}
+
+/* Commit info for objects modified */
+void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+ int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*numoidnotfound] = oid;
+ (*numoidnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+ if (version == ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_matchnolock)++;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ return;
+ }
+ } else { //A lock is acquired some place else
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ return;
+ }
+ }
+ }
+}
+
+/* Commit info for objects modified */
+void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+ int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*numoidnotfound] = oid;
+ (*numoidnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ (*v_matchnolock)++;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ //Keep track of what is locked
+ oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+ (*numoidlocked)++;
+ return;
+ }
+ } else { //Has reached max number of readers or some other transaction
+ //has acquired a lock on this object
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ return;
+ }
+ }
+ }
}
/* This function completes the ABORT process if the transaction is aborting */
numlocked = localtdata->transinfo->numlocked;
objlocked = localtdata->transinfo->objlocked;
+ int useWriteUnlock = 0;
for (i = 0; i < numlocked; i++) {
+ if(objlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = mhashSearch(objlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+ if(!useWriteUnlock) {
+ read_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ }
}
return 0;
return 1;
}
pthread_mutex_unlock(&mainobjstore_mutex);
+ /* Initialize read and write locks */
+ initdsmlocks(STATUSPTR(header));
memcpy(ptrcreate, header, tmpsize);
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
}
/* Unlock locked objects */
+ int useWriteUnlock = 0;
for(i = 0; i < numlocked; i++) {
+ if(oidlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+ if(!useWriteUnlock) {
+ read_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ }
}
-
return 0;
}
/* Clear Flags */
STATUS(headeraddr) =0;
+
return pile;
}