}
/* This function initializes things required in the transaction start*/
-transrecord_t *transStart() {
+__attribute__((malloc)) transrecord_t *transStart() {
transrecord_t *tmp;
if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
}
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
-#ifdef COMPILER
- tmp->revertlist=NULL;
-#endif
+ //#ifdef COMPILER
+ // tmp->revertlist=NULL; //Not necessary...already null
+ //#endif
return tmp;
}
/* This function finds the location of the objects involved in a transaction
* and returns the pointer to the object if found in a remote location */
-objheader_t *transRead(transrecord_t *record, unsigned int oid) {
+__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) {
unsigned int machinenumber;
objheader_t *tmp, *objheader;
objheader_t *objcopy;
#endif
} else
*/
+
+#ifdef ABORTREADERS
+ if (trans->abort) {
+ //abort this transaction
+ longjmp(trans->aborttrans,1);
+ } else
+ addtransaction(oid,record);
+#endif
+
if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
#ifdef TRANSSTATS
nmhashSearch++;
free(listmid);
return 1;
}
+
+
/* Invalidate objects in other machine cache */
if(tosend[i].f.nummod > 0) {
if((retval = invalidateObj(&(tosend[i]))) != 0) {
return 1;
}
}
+#ifdef ABORTREADERS
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread, record);
+#endif
+ }
+#ifdef ABORTREADERS
+ else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread,record);
}
+#endif
#endif
send_data(sd, &finalResponse, sizeof(char));
} else {
/* Complete local processing */
doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
+#ifdef ABORTREADERS
+ if(finalResponse == TRANS_COMMIT) {
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread, record);
+ } else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread,record);
+ }
+#endif
}
}
commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
} else { // Objects Modified
if(i == tdata->f.numread) {
- oidlocked[numoidlocked] = -1;
- numoidlocked++;
+ oidlocked[numoidlocked++] = -1;
}
int tmpsize;
objheader_t *headptr;
fflush(stdout);
return;
}
+ } else {
+ printf("ERROR...No Decision\n");
}
/* Free memory */
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
//Keep track of what is locked
- oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
- (*numoidlocked)++;
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
//Keep track of what is locked
- oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
- (*numoidlocked)++;
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
//printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
(*v_matchnolock)++;
//Keep track of what is locked
- oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
- (*numoidlocked)++;
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
//Keep track of what is locked
- oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
- (*numoidlocked)++;
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
//printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
GETSIZE(tmpsize, header);
char *tmptcptr = (char *) tcptr;
- memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+ {
+ struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+ struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
+ dst->___cachedCode___=src->___cachedCode___;
+ dst->___cachedHash___=src->___cachedHash___;
+
+ memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+ }
+
header->version += 1;
if(header->notifylist != NULL) {
notifyAll(&header->notifylist, OID(header), header->version);
*((unsigned int *)(&msg[1])) = numoid;
/* Send array of oids */
size = sizeof(unsigned int);
- {
- i = 0;
- while(i < numoid) {
- oid = oidarry[i];
- *((unsigned int *)(&msg[1] + size)) = oid;
- size += sizeof(unsigned int);
- i++;
- }
+
+ for(i = 0;i < numoid; i++) {
+ oid = oidarry[i];
+ *((unsigned int *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned int);
}
/* Send array of version */
- {
- i = 0;
- while(i < numoid) {
- version = versionarry[i];
- *((unsigned short *)(&msg[1] + size)) = version;
- size += sizeof(unsigned short);
- i++;
- }
+ for(i = 0;i < numoid; i++) {
+ version = versionarry[i];
+ *((unsigned short *)(&msg[1] + size)) = version;
+ size += sizeof(unsigned short);
}
- *((unsigned int *)(&msg[1] + size)) = myIpAddr;
- size += sizeof(unsigned int);
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
*((unsigned int *)(&msg[1] + size)) = threadid;
pthread_mutex_lock(&(ndata->threadnotify));
size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
prehashRemove(oid);
}
#endif
+ pthread_mutex_lock(&(ndata->threadnotify));
pthread_cond_signal(&(ndata->threadcond));
+ pthread_mutex_unlock(&(ndata->threadnotify));
}
}
}
}
void transAbort(transrecord_t *trans) {
+#ifdef ABORTREADERS
+ removetransactionhash(trans->lookupTable, trans);
+#endif
objstrDelete(trans->cache);
chashDelete(trans->lookupTable);
free(trans);