extern long long bigarray4[6*1024*1024];
extern int bigarray5[6*1024*1024];
extern int bigindex1;
-#define LOGTIME(x,y,z,a,b) {\
- int tmp=bigindex1; \
- bigarray1[tmp]=x; \
- bigarray2[tmp]=y; \
- bigarray3[tmp]=z; \
- bigarray4[tmp]=a; \
- bigarray5[tmp]=b; \
- bigindex1++; \
+#define LOGTIME(x,y,z,a,b) { \
+ int tmp=bigindex1; \
+ bigarray1[tmp]=x; \
+ bigarray2[tmp]=y; \
+ bigarray3[tmp]=z; \
+ bigarray4[tmp]=a; \
+ bigarray5[tmp]=b; \
+ bigindex1++; \
}
#else
#define LOGTIME(x,y,z,a,b)
#endif
-long long myrdtsc(void)
-{
- unsigned hi, lo;
- __asm__ __volatile__ ("rdtsc" : "=a"(lo), "=d"(hi));
+long long myrdtsc(void) {
+ unsigned hi, lo;
+ __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
return ( (unsigned long long)lo)|( ((unsigned long long)hi)<<32 );
}
/* If object is write locked, just wait */
/* May want to remove at some point */
while((*(volatile int *)STATUSPTR(h))<=0)
- sched_yield();
+ sched_yield();
GETSIZE(size, h);
size += sizeof(objheader_t);
sockid = (int) acceptfd;
return 1;
}
ptr = (char *) modptr;
- for(i = 0 ; i < fixed.nummod; i++) {
+ for(i = 0; i < fixed.nummod; i++) {
headaddr = (objheader_t *) ptr;
oid = OID(headaddr);
oidmod[i] = oid;
incr += sizeof(unsigned int);
version = *((unsigned short *)(objread + incr));
retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
- &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
+ &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
} else { //Objs modified
if(i == fixed->numread) {
- oidlocked[objlocked++] = -1;
+ oidlocked[objlocked++] = -1;
}
int tmpsize;
headptr = (objheader_t *) ptr;
GETSIZE(tmpsize, headptr);
ptr += sizeof(objheader_t) + tmpsize;
retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
- &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
- &numBytes, &control, oid, version);
+ &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
+ &numBytes, &control, oid, version);
}
if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
//unlock objects as soon versions mismatch or locks cannot be acquired)
if (objlocked > 0) {
- int useWriteUnlock = 0;
- for(j = 0; j < objlocked; j++) {
- if(oidlocked[j] == -1) {
- useWriteUnlock = 1;
- continue;
- }
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- if(useWriteUnlock) {
- write_unlock(STATUSPTR(headptr));
- } else {
- read_unlock(STATUSPTR(headptr));
- }
- }
- if(v_nomatch > 0)
- free(oidlocked);
+ int useWriteUnlock = 0;
+ for(j = 0; j < objlocked; j++) {
+ if(oidlocked[j] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ if(useWriteUnlock) {
+ write_unlock(STATUSPTR(headptr));
+ } else {
+ read_unlock(STATUSPTR(headptr));
+ }
+ }
+ if(v_nomatch > 0)
+ free(oidlocked);
}
objlocked=0;
break;
}
#endif
/*
- if (objlocked > 0) {
- int useWriteUnlock = 0;
- for(j = 0; j < objlocked; j++) {
+ if (objlocked > 0) {
+ int useWriteUnlock = 0;
+ for(j = 0; j < objlocked; j++) {
if(oidlocked[j] == -1) {
useWriteUnlock = 1;
continue;
} else {
read_unlock(STATUSPTR(headptr));
}
- }
- free(oidlocked);
- }
- */
+ }
+ free(oidlocked);
+ }
+ */
control=TRANS_DISAGREE;
send_data(acceptfd, &control, sizeof(char));
#ifdef CACHE
if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
- *control = TRANS_AGREE;
+ *control = TRANS_AGREE;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
} else { //we are locked
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
- *control=TRANS_SOFT_ABORT;
+ *control=TRANS_SOFT_ABORT;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
/* Save the oids not found and number of oids not found for later use */
oidnotfound[*objnotfound] = oid;
(*objnotfound)++;
- *control = TRANS_DISAGREE;
+ *control = TRANS_DISAGREE;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
if (version == ((objheader_t *)mobj)->version) { /* match versions */
(*v_matchnolock)++;
- *control=TRANS_AGREE;
+ *control=TRANS_AGREE;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[(*objvernotmatch)++] = oid;
} else { /* Some other transaction has aquired a write lock on this object */
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
(*v_matchlock)++;
- *control=TRANS_SOFT_ABORT;
+ *control=TRANS_SOFT_ABORT;
} else { /* If versions don't match ...HARD ABORT */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
return *control;
}
-void procRestObjs(char *objread,
- char *objmod,
- int index,
- int numread,
- int nummod,
- unsigned int *oidnotfound,
+void procRestObjs(char *objread,
+ char *objmod,
+ int index,
+ int numread,
+ int nummod,
+ unsigned int *oidnotfound,
unsigned int *oidvernotmatch,
- int *objnotfound,
- int *objvernotmatch,
- int *v_nomatch,
+ int *objnotfound,
+ int *objvernotmatch,
+ int *v_nomatch,
int *numBytes) {
int i;
unsigned int oid;
objmod += sizeof(objheader_t) + tmpsize;
}
processVerNoMatch(oidnotfound,
- oidvernotmatch,
- objnotfound,
- objvernotmatch,
- v_nomatch,
- numBytes,
- oid,
- version);
+ oidvernotmatch,
+ objnotfound,
+ objvernotmatch,
+ v_nomatch,
+ numBytes,
+ oid,
+ version);
}
return;
}
-void processVerNoMatch(unsigned int *oidnotfound,
- unsigned int *oidvernotmatch,
- int *objnotfound,
- int *objvernotmatch,
- int *v_nomatch,
- int *numBytes,
- unsigned int oid,
- unsigned short version) {
+void processVerNoMatch(unsigned int *oidnotfound,
+ unsigned int *oidvernotmatch,
+ int *objnotfound,
+ int *objvernotmatch,
+ int *v_nomatch,
+ int *numBytes,
+ unsigned int oid,
+ unsigned short version) {
void *mobj;
/* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
if (version != ((objheader_t *)mobj)->version) { /* match versions */
(*v_nomatch)++;
oidvernotmatch[*objvernotmatch] = oid;
- (*objvernotmatch)++;
- int size;
+ (*objvernotmatch)++;
+ int size;
GETSIZE(size, mobj);
size += sizeof(objheader_t);
*numBytes += size;
}
}
- /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
- * to send to Coordinator based on the votes of oids involved in the transaction */
- char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
- int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
- unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
- int val;
- char control = 0;
-
- /* Condition to send TRANS_AGREE */
- if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
- control = TRANS_AGREE;
- /* Send control message */
- send_data(acceptfd, &control, sizeof(char));
- }
- /* Condition to send TRANS_SOFT_ABORT */
- if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
- control = TRANS_SOFT_ABORT;
-
- /* Send control message */
- send_data(acceptfd, &control, sizeof(char));
-
- /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
- if(*(objnotfound) != 0) {
- int msg[1];
- msg[0] = *(objnotfound);
- send_data(acceptfd, &msg, sizeof(int));
- int size = sizeof(unsigned int)* *(objnotfound);
- send_data(acceptfd, oidnotfound, size);
- }
- }
+/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
+ * to send to Coordinator based on the votes of oids involved in the transaction */
+char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
+ int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
+ unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
+ int val;
+ char control = 0;
+
+ /* Condition to send TRANS_AGREE */
+ if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
+ control = TRANS_AGREE;
+ /* Send control message */
+ send_data(acceptfd, &control, sizeof(char));
+ }
+ /* Condition to send TRANS_SOFT_ABORT */
+ if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
+ control = TRANS_SOFT_ABORT;
- /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
- * if Participant receives a TRANS_COMMIT */
- transinfo->objlocked = oidlocked;
- transinfo->objnotfound = oidnotfound;
- transinfo->modptr = modptr;
- transinfo->numlocked = *(objlocked);
- transinfo->numnotfound = *(objnotfound);
- return control;
+ /* Send control message */
+ send_data(acceptfd, &control, sizeof(char));
+
+ /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
+ if(*(objnotfound) != 0) {
+ int msg[1];
+ msg[0] = *(objnotfound);
+ send_data(acceptfd, &msg, sizeof(int));
+ int size = sizeof(unsigned int)* *(objnotfound);
+ send_data(acceptfd, oidnotfound, size);
+ }
}
- /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
- * addresses in lookup table and also changes version number
- * Sends an ACK back to Coordinator */
- int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
- objheader_t *header;
- objheader_t *newheader;
- int i = 0, offset = 0;
- char control;
- int tmpsize;
-
- /* Process each modified object saved in the mainobject store */
- for(i = 0; i < nummod; i++) {
- if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- GETSIZE(tmpsize,header);
-
- {
- struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
- struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
- dst->type=src->type;
- dst->___cachedCode___=src->___cachedCode___;
- dst->___cachedHash___=src->___cachedHash___;
- memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
- }
+ /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+ * if Participant receives a TRANS_COMMIT */
+ transinfo->objlocked = oidlocked;
+ transinfo->objnotfound = oidnotfound;
+ transinfo->modptr = modptr;
+ transinfo->numlocked = *(objlocked);
+ transinfo->numnotfound = *(objnotfound);
+ return control;
+}
+
+/* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
+ * addresses in lookup table and also changes version number
+ * Sends an ACK back to Coordinator */
+int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
+ objheader_t *header;
+ objheader_t *newheader;
+ int i = 0, offset = 0;
+ char control;
+ int tmpsize;
+
+ /* Process each modified object saved in the mainobject store */
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ GETSIZE(tmpsize,header);
+
+ {
+ struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+ struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
+ dst->type=src->type;
+ dst->___cachedCode___=src->___cachedCode___;
+ dst->___cachedHash___=src->___cachedHash___;
+ memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+ }
- //memory barrier
- CFENCE;
+ //memory barrier
+ CFENCE;
- header->version += 1;
- /* If threads are waiting on this object to be updated, notify them */
- if(header->notifylist != NULL) {
- notifyAll(&header->notifylist, OID(header), header->version);
- }
- offset += sizeof(objheader_t) + tmpsize;
+ header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
}
+ offset += sizeof(objheader_t) + tmpsize;
+ }
if (nummod > 0)
free(modptr);
short offsetarry[numoffset];
recv_data_buf((int)acceptfd, readbuffer, &gid, sizeof(int));
recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
- LOGTIME('A',oid ,0,myrdtsc(),gid); //after recv the entire prefetch request
+ LOGTIME('A',oid,0,myrdtsc(),gid); //after recv the entire prefetch request
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found in buffer for later use */
- size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int);
char sendbuffer[size+1];
sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *) (sendbuffer+sizeof(char))) = size;
LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
/* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
+ for(i = 0; i< numoffset; i++) {
/* Check for arrays */
if(TYPE(header) >= NUMCLASSES) {
int elementsize = classsize[TYPE(header)];
if (oid==0)
break;
- LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request
+ LOGTIME('B',oid,0,myrdtsc(),gid); //send next oid found from prefetch request
if((header = mhashSearch(oid)) == NULL) {
- size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ size = sizeof(int)+sizeof(int) + sizeof(char) + sizeof(unsigned int);
char sendbuffer[size+1];
sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *) (sendbuffer+1)) = size;
*((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
*((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
- *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
+ *((int *)(sendbuffer+sizeof(int) + sizeof(char)+sizeof(char)+sizeof(unsigned int))) = gid;
send_buf(sd, &writebuffer, sendbuffer, size+1);
- LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
+ LOGTIME('J',oid, 0,myrdtsc(), gid); //send first oid not found prefetch request
break;
} else { /* Obj Found */
int incr = 1;
incr += sizeof(char);
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
- *((int *)(sendbuffer+incr)) = gid;
- incr += sizeof(int);
+ *((int *)(sendbuffer+incr)) = gid;
+ incr += sizeof(int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
send_buf(sd, &writebuffer, sendbuffer, size+1);
- LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
- LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
+ LOGOIDTYPE("SRES", oid, TYPE(header), (myrdtsc()-clockoffset));
+ LOGTIME('C',oid,TYPE(header),myrdtsc(), gid); //send first oid found from prefetch request
}
} //end of for
}
} //end of while
- //Release socket
+ //Release socket
if (mid!=-1) {
forcesend_buf(sd, &writebuffer, NULL, 0);
freeSockWithLock(transPResponseSocketPool, mid, sd);
close(sd);
return;
} else {
-
+
//Send Update notification
msg[0] = THREAD_NOTIFY_RESPONSE;
*((unsigned int *)&msg[1]) = oid;