if (fixed->nummod > 0)
free(modptr);
/* Unlock objects that was locked due to this transaction */
- int useWriteUnlock = 0;
+ int useWriteUnlock = 0; //TODO verify is this piece of unlocking code ever used
for(i = 0; i< transinfo->numlocked; i++) {
if(transinfo->objlocked[i] == -1) {
useWriteUnlock = 1;
* Object store holds the modified objects involved in the transaction request */
ptr = (char *) modptr;
+ char retval;
+
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) { //Objs only read and not modified
oid = *((unsigned int *)(objread + incr));
incr += sizeof(unsigned int);
version = *((unsigned short *)(objread + incr));
- getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
+ retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
&v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
} else { //Objs modified
if(i == fixed->numread) {
- oidlocked[objlocked++] = -1;
+ oidlocked[objlocked++] = -1;
}
int tmpsize;
headptr = (objheader_t *) ptr;
version = headptr->version;
GETSIZE(tmpsize, headptr);
ptr += sizeof(objheader_t) + tmpsize;
- getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
+ retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
&objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
&numBytes, &control, oid, version);
}
+ if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+ //unlock objects as soon versions mismatch or locks cannot be acquired)
+ if (objlocked > 0) {
+ int useWriteUnlock = 0;
+ for(j = 0; j < objlocked; j++) {
+ if(oidlocked[j] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(headptr));
+ } else {
+ read_unlock(STATUSPTR(headptr));
+ }
+ }
+ if(v_nomatch > 0)
+ free(oidlocked);
+ }
+ objlocked=0;
+ break;
+ }
+ }
+ //go through rest of the objects for version mismatches
+ if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+ i++;
+ procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes);
}
/* send TRANS_DISAGREE and objs*/
offset += size;
}
#endif
+ /*
if (objlocked > 0) {
int useWriteUnlock = 0;
for(j = 0; j < objlocked; j++) {
- if(oidlocked[j] == -1) {
- useWriteUnlock = 1;
- continue;
- }
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- if(useWriteUnlock) {
- write_unlock(STATUSPTR(headptr));
- } else {
- read_unlock(STATUSPTR(headptr));
- }
+ if(oidlocked[j] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(headptr));
+ } else {
+ read_unlock(STATUSPTR(headptr));
+ }
}
free(oidlocked);
}
+ */
+ //control=TRANS_DISAGREE;
send_data(acceptfd, &control, sizeof(char));
#ifdef CACHE
send_data(acceptfd, &numBytes, sizeof(int));
}
/* Update Commit info for objects that are read */
-void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
+char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
char *control, unsigned int oid, unsigned short version) {
/* Save the oids not found and number of oids not found for later use */
oidnotfound[*objnotfound] = oid;
(*objnotfound)++;
+ *control = TRANS_DISAGREE;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
+ *control = TRANS_AGREE;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
} else { //we are locked
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
+ *control=TRANS_SOFT_ABORT;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
}
}
}
+ return *control;
}
/* Update Commit info for objects that are read */
-void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
+char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
void *mobj;
/* Save the oids not found and number of oids not found for later use */
oidnotfound[*objnotfound] = oid;
(*objnotfound)++;
+ *control = TRANS_DISAGREE;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
+ *control=TRANS_AGREE;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[(*objvernotmatch)++] = oid;
} else { /* Some other transaction has aquired a write lock on this object */
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
+ *control=TRANS_SOFT_ABORT;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
}
}
}
+ return *control;
}
-/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
- * to send to Coordinator based on the votes of oids involved in the transaction */
-char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
- int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
- unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
- int val;
- char control = 0;
-
- /* Condition to send TRANS_AGREE */
- if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
- control = TRANS_AGREE;
- /* Send control message */
- send_data(acceptfd, &control, sizeof(char));
+void procRestObjs(char *objread,
+ char *objmod,
+ int index,
+ int numread,
+ int nummod,
+ unsigned int *oidnotfound,
+ unsigned int *oidvernotmatch,
+ int *objnotfound,
+ int *objvernotmatch,
+ int *v_nomatch,
+ int *numBytes) {
+ int i;
+ unsigned int oid;
+ unsigned short version;
+
+ /* Process each oid in the machine pile/ group per thread */
+ //printf("DEBUG: index= %d, numread= %d, nummod= %d numread+nummod= %d\n", index,numread,nummod,numread+nummod);
+ for (i = index; i < numread+nummod; i++) {
+ //printf("DEBUG: i= %d\n", i);
+ //fflush(stdout);
+ if (i < numread) { //Objs only read and not modified
+ int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(objread + incr));
+ incr += sizeof(unsigned int);
+ version = *((unsigned short *)(objread + incr));
+ } else { //Objs modified
+ objheader_t *headptr;
+ headptr = (objheader_t *) objmod;
+ oid = OID(headptr);
+ version = headptr->version;
+ int tmpsize;
+ GETSIZE(tmpsize, headptr);
+ objmod += sizeof(objheader_t) + tmpsize;
+ }
+ processVerNoMatch(oidnotfound,
+ oidvernotmatch,
+ objnotfound,
+ objvernotmatch,
+ v_nomatch,
+ numBytes,
+ oid,
+ version);
}
- /* Condition to send TRANS_SOFT_ABORT */
- if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
- control = TRANS_SOFT_ABORT;
+ return;
+}
- /* Send control message */
- send_data(acceptfd, &control, sizeof(char));
+void processVerNoMatch(unsigned int *oidnotfound,
+ unsigned int *oidvernotmatch,
+ int *objnotfound,
+ int *objvernotmatch,
+ int *v_nomatch,
+ int *numBytes,
+ unsigned int oid,
+ unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
- /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
- if(*(objnotfound) != 0) {
- int msg[1];
- msg[0] = *(objnotfound);
- send_data(acceptfd, &msg, sizeof(int));
- int size = sizeof(unsigned int)* *(objnotfound);
- send_data(acceptfd, oidnotfound, size);
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*objnotfound] = oid;
+ (*objnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ //if (!write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+ if (version != ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_nomatch)++;
+ oidvernotmatch[*objvernotmatch] = oid;
+ (*objvernotmatch)++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ *numBytes += size;
}
}
-
- /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
- * if Participant receives a TRANS_COMMIT */
- transinfo->objlocked = oidlocked;
- transinfo->objnotfound = oidnotfound;
- transinfo->modptr = modptr;
- transinfo->numlocked = *(objlocked);
- transinfo->numnotfound = *(objnotfound);
- return control;
}
-/* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
- * addresses in lookup table and also changes version number
- * Sends an ACK back to Coordinator */
-int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
- objheader_t *header;
- objheader_t *newheader;
- int i = 0, offset = 0;
- char control;
- int tmpsize;
-
- /* Process each modified object saved in the mainobject store */
- for(i = 0; i < nummod; i++) {
- if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
+ /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
+ * to send to Coordinator based on the votes of oids involved in the transaction */
+ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
+ int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
+ unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
+ int val;
+ char control = 0;
+
+ /* Condition to send TRANS_AGREE */
+ if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
+ control = TRANS_AGREE;
+ /* Send control message */
+ send_data(acceptfd, &control, sizeof(char));
}
- GETSIZE(tmpsize,header);
-
- {
- struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
- struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
- dst->type=src->type;
- dst->___cachedCode___=src->___cachedCode___;
- dst->___cachedHash___=src->___cachedHash___;
- memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
- }
- header->version += 1;
- /* If threads are waiting on this object to be updated, notify them */
- if(header->notifylist != NULL) {
- notifyAll(&header->notifylist, OID(header), header->version);
+ /* Condition to send TRANS_SOFT_ABORT */
+ if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
+ //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
+ control = TRANS_SOFT_ABORT;
+
+ /* Send control message */
+ send_data(acceptfd, &control, sizeof(char));
+
+ /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
+ if(*(objnotfound) != 0) {
+ int msg[1];
+ msg[0] = *(objnotfound);
+ send_data(acceptfd, &msg, sizeof(int));
+ int size = sizeof(unsigned int)* *(objnotfound);
+ send_data(acceptfd, oidnotfound, size);
+ }
}
- offset += sizeof(objheader_t) + tmpsize;
+
+ /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+ * if Participant receives a TRANS_COMMIT */
+ transinfo->objlocked = oidlocked;
+ transinfo->objnotfound = oidnotfound;
+ transinfo->modptr = modptr;
+ transinfo->numlocked = *(objlocked);
+ transinfo->numnotfound = *(objnotfound);
+ return control;
}
+ /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
+ * addresses in lookup table and also changes version number
+ * Sends an ACK back to Coordinator */
+ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
+ objheader_t *header;
+ objheader_t *newheader;
+ int i = 0, offset = 0;
+ char control;
+ int tmpsize;
+
+ /* Process each modified object saved in the mainobject store */
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize,header);
+
+ {
+ struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+ struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
+ dst->type=src->type;
+ dst->___cachedCode___=src->___cachedCode___;
+ dst->___cachedHash___=src->___cachedHash___;
+ memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+ }
+ header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+ offset += sizeof(objheader_t) + tmpsize;
+ }
+
if (nummod > 0)
free(modptr);