X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=Robust%2Fsrc%2FRuntime%2FDSTM%2Finterface%2Ftrans.c;h=ec852ce3bde1b9d06e9ad856d81f3db32eec4d01;hb=8e068bd8a860ab9c1ef8a03f98cbfa57a958906b;hp=43bcbf8736a11b34224cd3a0eb16e27f0be9a572;hpb=9585036db84b6e3f4b36aeaad4d9b42e5c94ee27;p=IRC.git diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 43bcbf87..ec852ce3 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -1,10 +1,11 @@ #include "dstm.h" +#include "debugmacro.h" #include "ip.h" #include "machinepile.h" -#include "mlookup.h" +#include "altmlookup.h" #include "llookup.h" #include "plookup.h" -#include "prelookup.h" +#include "altprelookup.h" #include "threadnotify.h" #include "queue.h" #include "addUdpEnhance.h" @@ -18,10 +19,61 @@ #ifdef ABORTREADERS #include "abortreaders.h" #endif +#include "trans.h" #define NUM_THREADS 1 #define CONFIG_FILENAME "dstm.conf" +//#define LOGEVENTS //turn on Logging events +#ifdef LOGEVENTS +char bigarray[16*1024*1024]; +int bigindex=0; +#define LOGEVENT(x) { \ + int tmp=bigindex++; \ + bigarray[tmp]=x; \ + } +#else +#define LOGEVENT(x) +#endif + +//#define LOGTIMES +#ifdef LOGTIMES +char bigarray1[6*1024*1024]; +unsigned int bigarray2[6*1024*1024]; +unsigned int bigarray3[6*1024*1024]; +long long bigarray4[6*1024*1024]; +int bigarray5[6*1024*1024]; +int bigindex1=0; +#define LOGTIME(x,y,z,a,b) {\ + int tmp=bigindex1; \ + bigarray1[tmp]=x; \ + bigarray2[tmp]=y; \ + bigarray3[tmp]=z; \ + bigarray4[tmp]=a; \ + bigarray5[tmp]=b; \ + bigindex1++; \ +} +#else +#define LOGTIME(x,y,z,a,b) +#endif + +/* Thread transaction variables */ +__thread objstr_t *t_cache; +__thread struct ___Object___ *revertlist; +__thread struct timespec exponential_backoff; +__thread int count_exponential_backoff; +__thread const int max_exponential_backoff = 1000; // safety limit +#ifdef SANDBOX +__thread int trans_allocation_bytes; +#endif + + +#ifdef ABORTREADERS +__thread int t_abort; +__thread jmp_buf aborttrans; +#endif + +int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */ /* Global Variables */ extern int classsize[]; @@ -42,7 +94,6 @@ int myIndexInHostArray; unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; - sockPoolHashTable_t *transReadSockPool; sockPoolHashTable_t *transPrefetchSockPool; sockPoolHashTable_t *transRequestSockPool; @@ -57,13 +108,17 @@ int numTransAbort = 0; int nchashSearch = 0; int nmhashSearch = 0; int nprehashSearch = 0; +int ndirtyCacheObj = 0; int nRemoteSend = 0; int nSoftAbort = 0; int bytesSent = 0; int bytesRecv = 0; +int totalObjSize = 0; +int sendRemoteReq = 0; +int getResponse = 0; void printhex(unsigned char *, int); -plistnode_t *createPiles(transrecord_t *); +plistnode_t *createPiles(); plistnode_t *sortPiles(plistnode_t *pileptr); /******************************* @@ -85,12 +140,125 @@ void send_data(int fd, void *buf, int buflen) { } } +void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) { + if (buflen+sendbuffer->offset>WMAXBUF) { + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; + send_data(fd, buffer, buflen); + return; + } + memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen); + sendbuffer->offset+=buflen; + if (sendbuffer->offset>WTOP) { + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; + } +} + +void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) { + if (buflen+sendbuffer->offset>WMAXBUF) { + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; + send_data(fd, buffer, buflen); + return; + } + memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen); + sendbuffer->offset+=buflen; + send_data(fd, sendbuffer->buf, sendbuffer->offset); + sendbuffer->offset=0; +} + +int recvw(int fd, void *buf, int len, int flags) { + return recv(fd, buf, len, flags); +} + +void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) { + char *buf=(char *)buffer; + int numbytes=readbuffer->head-readbuffer->tail; + if (numbytes>buflen) + numbytes=buflen; + if (numbytes>0) { + memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes); + readbuffer->tail+=numbytes; + buflen-=numbytes; + buf+=numbytes; + } + if (buflen==0) { + return; + } + if (buflen>=MAXBUF) { + recv_data(fd, buf, buflen); + return; + } + + int maxbuf=MAXBUF; + int obufflen=buflen; + readbuffer->head=0; + + while (buflen > 0) { + int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0); + if (numbytes == -1) { + perror("recv"); + exit(0); + } + bytesRecv+=numbytes; + buflen-=numbytes; + readbuffer->head+=numbytes; + maxbuf-=numbytes; + } + memcpy(buf,readbuffer->buf,obufflen); + readbuffer->tail=obufflen; +} + +int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) { + char *buf=(char *)buffer; + //now tail<=head + int numbytes=readbuffer->head-readbuffer->tail; + if (numbytes>buflen) + numbytes=buflen; + if (numbytes>0) { + memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes); + readbuffer->tail+=numbytes; + buflen-=numbytes; + buf+=numbytes; + } + if (buflen==0) + return 1; + + if (buflen>=MAXBUF) { + return recv_data_errorcode(fd, buf, buflen); + } + + int maxbuf=MAXBUF; + int obufflen=buflen; + readbuffer->head=0; + + while (buflen > 0) { + int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0); + if (numbytes ==0) { + return 0; + } + if (numbytes==-1) { + perror("recvbuf"); + return -1; + } + bytesRecv+=numbytes; + buflen-=numbytes; + readbuffer->head+=numbytes; + maxbuf-=numbytes; + } + memcpy(buf,readbuffer->buf,obufflen); + readbuffer->tail=obufflen; + return 1; +} + + void recv_data(int fd, void *buf, int buflen) { char *buffer = (char *)(buf); int size = buflen; int numbytes; while (size > 0) { - numbytes = recv(fd, buffer, size, 0); + numbytes = recvw(fd, buffer, size, 0); bytesRecv = bytesRecv + numbytes; if (numbytes == -1) { perror("recv"); @@ -106,13 +274,14 @@ int recv_data_errorcode(int fd, void *buf, int buflen) { int size = buflen; int numbytes; while (size > 0) { - numbytes = recv(fd, buffer, size, 0); + numbytes = recvw(fd, buffer, size, 0); if (numbytes==0) return 0; if (numbytes == -1) { perror("recv"); return -1; } + bytesRecv+=numbytes; buffer += numbytes; size -= numbytes; } @@ -149,17 +318,32 @@ inline int findmax(int *array, int arraylength) { return max; } +#define INLINEPREFETCH +#define PREFTHRESHOLD 0 + /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { /* Allocate for the queue node*/ int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); int len; - char * node= getmemory(qnodesize); +#ifdef INLINEPREFETCH + int attempted=0; + char *node; + do { + node=getmemory(qnodesize); + if (node==NULL&&attempted) + break; + if (node!=NULL) { +#else + char *node=getmemory(qnodesize); +#endif int top=endoffsets[ntuples-1]; - if (node==NULL) + if (node==NULL) { + LOGEVENT('D'); return; + } /* Set queue node values */ /* TODO: Remove this after testing */ @@ -172,8 +356,36 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short)); memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short)); +#ifdef INLINEPREFETCH + movehead(qnodesize); + } + int numpref=numavailable(); + attempted=1; + + if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) { + node=gettail(); + prefetchpile_t *pilehead = foundLocal(node,numpref,siteid); + if (pilehead!=NULL) { + // Get sock from shared pool + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + globalid++; + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd, globalid); + ptr = ptr->next; + } + + mcdealloc(pilehead); + } + resetqueue(); + }//end do prefetch if condition + } while(node==NULL); +#else /* Lock and insert into primary prefetch queue */ movehead(qnodesize); +#endif } /* This function starts up the transaction runtime. */ @@ -289,12 +501,16 @@ void transInit() { retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL); } while(retval!=0); #else +#ifndef INLINEPREFETCH do { retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); } while(retval!=0); #endif +#endif +#ifndef INLINEPREFETCH pthread_detach(tPrefetch); #endif +#endif } /* This function stops the threads spawned */ @@ -322,19 +538,28 @@ void randomdelay() { return; } -/* This function initializes things required in the transaction start*/ -__attribute__((malloc)) transrecord_t *transStart() { - transrecord_t *tmp; - if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) { - printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); - return NULL; +void exponentialdelay() { + exponential_backoff.tv_nsec = exponential_backoff.tv_nsec * 2; + nanosleep(&exponential_backoff, NULL); + ++count_exponential_backoff; + if (count_exponential_backoff >= max_exponential_backoff) { + printf(" reached max_exponential_backoff at %s, %s(), %d\n", __FILE__, __func__, __LINE__); + exit(-1); } - tmp->cache = objstrCreate(1048576); - tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR); - //#ifdef COMPILER - // tmp->revertlist=NULL; //Not necessary...already null - //#endif - return tmp; + return; +} + +/* This function initializes things required in the transaction start*/ +void transStart() { + t_cache = objstrCreate(1048576); + t_chashCreate(CHASH_SIZE, CLOADFACTOR); + revertlist=NULL; +#ifdef SANDBOX + trans_allocation_bytes = 0; +#endif +#ifdef ABORTREADERS + t_abort=0; +#endif } // Search for an address for a given oid @@ -355,22 +580,24 @@ INLINE void * chashSearchI(chashtable_t *table, unsigned int key) { }*/ + + /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ -__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) { +__attribute__((pure)) objheader_t *transRead(unsigned int oid) { unsigned int machinenumber; objheader_t *tmp, *objheader; objheader_t *objcopy; int size; void *buf; chashlistnode_t *node; - chashtable_t *table=record->lookupTable; if(oid == 0) { return NULL; } - node= &table->table[(oid & table->mask)>>1]; + + node= &c_table[(oid & c_mask)>>1]; do { if(node->key == oid) { #ifdef TRANSSTATS @@ -400,14 +627,14 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int */ #ifdef ABORTREADERS - if (record->abort) { + if (t_abort) { //abort this transaction - printf("ABORTING\n"); - objstrDelete(record->cache); - chashDelete(record->lookupTable); - _longjmp(record->aborttrans,1); + removetransactionhash(); + objstrDelete(t_cache); + t_chashDelete(); + _longjmp(aborttrans,1); } else - addtransaction(oid,record); + addtransaction(oid); #endif if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { @@ -417,11 +644,11 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int /* Look up in machine lookup table and copy into cache*/ GETSIZE(size, objheader); size += sizeof(objheader_t); - objcopy = (objheader_t *) objstrAlloc(&record->cache, size); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); memcpy(objcopy, objheader, size); /* Insert into cache's lookup table */ STATUS(objcopy)=0; - chashInsert(record->lookupTable, OID(objheader), objcopy); + t_chashInsert(OID(objheader), objcopy); #ifdef COMPILER return &objcopy[1]; #else @@ -430,29 +657,36 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int } else { #ifdef CACHE if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if(STATUS(tmp) & DIRTY) { +#ifdef TRANSSTATS + ndirtyCacheObj++; +#endif + goto remoteread; + } #ifdef TRANSSTATS nprehashSearch++; #endif /* Look up in prefetch cache */ GETSIZE(size, tmp); size+=sizeof(objheader_t); - objcopy = (objheader_t *) objstrAlloc(&record->cache, size); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); memcpy(objcopy, tmp, size); /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, OID(tmp), objcopy); + t_chashInsert(OID(tmp), objcopy); #ifdef COMPILER return &objcopy[1]; #else return objcopy; #endif } +remoteread: #endif /* Get the object from the remote location */ if((machinenumber = lhashSearch(oid)) == 0) { printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); return NULL; } - objcopy = getRemoteObj(record, machinenumber, oid); + objcopy = getRemoteObj(machinenumber, oid); if(objcopy == NULL) { printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); @@ -462,6 +696,135 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int nRemoteSend++; #endif #ifdef COMPILER +#ifdef CACHE + //Copy object to prefetch cache + pthread_mutex_lock(&prefetchcache_mutex); + objheader_t *headerObj; + int size; + GETSIZE(size, objcopy); + if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) { + printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__, + __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + return NULL; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(headerObj, objcopy, size+sizeof(objheader_t)); + //make an entry in prefetch lookup hashtable + prehashInsert(oid, headerObj); + LOGEVENT('B'); +#endif + return &objcopy[1]; +#else + return objcopy; +#endif + } + } +} + + +/* This function finds the location of the objects involved in a transaction + * and returns the pointer to the object if found in a remote location */ +__attribute__((pure)) objheader_t *transRead2(unsigned int oid) { +//DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) { + unsigned int machinenumber; + objheader_t *tmp, *objheader; + objheader_t *objcopy; + int size; + +#ifdef ABORTREADERS + if (t_abort) { + //abort this transaction + removetransactionhash(); + objstrDelete(t_cache); + t_chashDelete(); + _longjmp(aborttrans,1); + } else + addtransaction(oid); +#endif + + if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { +#ifdef TRANSSTATS + nmhashSearch++; +#endif + /* Look up in machine lookup table and copy into cache*/ + GETSIZE(size, objheader); + size += sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); + memcpy(objcopy, objheader, size); + /* Insert into cache's lookup table */ + STATUS(objcopy)=0; + t_chashInsert(OID(objheader), objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } else { +#ifdef CACHE + if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + if(STATUS(tmp) & DIRTY) { +#ifdef TRANSSTATS + ndirtyCacheObj++; +#endif + goto remoteread; + } +#ifdef TRANSSTATS + LOGEVENT('P') + nprehashSearch++; +#endif + /* Look up in prefetch cache */ + GETSIZE(size, tmp); + size+=sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(&t_cache, size); + memcpy(objcopy, tmp, size); + LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc()); + /* Insert into cache's lookup table */ + t_chashInsert(OID(tmp), objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } +remoteread: +#endif + /* Get the object from the remote location */ + if((machinenumber = lhashSearch(oid)) == 0) { + printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); + return NULL; + } + objcopy = getRemoteObj(machinenumber, oid); +#ifdef TRANSSTATS + LOGEVENT('R'); + nRemoteSend++; +#endif + + if(objcopy == NULL) { + printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__); + return NULL; + } else { +#ifdef COMPILER +#ifdef CACHE + LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc()); + LOGTIME('r', oid, TYPE(objcopy),myrdtsc(),0); + //Copy object to prefetch cache + pthread_mutex_lock(&prefetchcache_mutex); + objheader_t *headerObj; + int size; + GETSIZE(size, objcopy); + if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) { + printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__, + __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + return NULL; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(headerObj, objcopy, size+sizeof(objheader_t)); + //make an entry in prefetch lookup hashtable + prehashInsert(oid, headerObj); + LOGEVENT('B'); +#endif return &objcopy[1]; #else return objcopy; @@ -471,13 +834,20 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int } /* This function creates objects in the transaction record */ -objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { - objheader_t *tmp = (objheader_t *) objstrAlloc(&record->cache, (sizeof(objheader_t) + size)); +objheader_t *transCreateObj(unsigned int size) { + objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size)); OID(tmp) = getNewOID(); tmp->version = 1; tmp->rcount = 1; STATUS(tmp) = NEW; - chashInsert(record->lookupTable, OID(tmp), tmp); + t_chashInsert(OID(tmp), tmp); +#ifdef SANDBOX + trans_allocation_bytes += size; + /* Validate the read set if allocation is exceeds threshold */ + if(trans_allocation_bytes > MEM_ALLOC_THRESHOLD) { + check_mem_alloc(); + } +#endif #ifdef COMPILER return &tmp[1]; //want space after object header @@ -489,14 +859,14 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { #if 1 /* This function creates machine piles based on all machines involved in a * transaction commit request */ -plistnode_t *createPiles(transrecord_t *record) { +plistnode_t *createPiles() { int i; plistnode_t *pile = NULL; unsigned int machinenum; objheader_t *headeraddr; - chashlistnode_t * ptr = record->lookupTable->table; + chashlistnode_t * ptr = c_table; /* Represents number of bins in the chash table */ - unsigned int size = record->lookupTable->size; + unsigned int size = c_size; for(i = 0; i < size ; i++) { chashlistnode_t * curr = &ptr[i]; @@ -516,7 +886,7 @@ plistnode_t *createPiles(transrecord_t *record) { } //Make machine groups - pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements); + pile = pInsert(pile, headeraddr, machinenum, c_numelements); curr = curr->next; } } @@ -525,14 +895,14 @@ plistnode_t *createPiles(transrecord_t *record) { #else /* This function creates machine piles based on all machines involved in a * transaction commit request */ -plistnode_t *createPiles(transrecord_t *record) { +plistnode_t *createPiles() { int i; plistnode_t *pile = NULL; unsigned int machinenum; objheader_t *headeraddr; - struct chashentry * ptr = record->lookupTable->table; + struct chashentry * ptr = c_table; /* Represents number of bins in the chash table */ - unsigned int size = record->lookupTable->size; + unsigned int size = c_size; for(i = 0; i < size ; i++) { struct chashentry * curr = & ptr[i]; @@ -551,7 +921,7 @@ plistnode_t *createPiles(transrecord_t *record) { } //Make machine groups - pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements); + pile = pInsert(pile, headeraddr, machinenum, c_numelements); } return pile; } @@ -562,35 +932,57 @@ plistnode_t *createPiles(transrecord_t *record) { * and creates new piles by calling the createPiles(), * Sends a transrequest() to each remote machines for objects found remotely * and calls handleLocalReq() to process objects found locally */ -int transCommit(transrecord_t *record) { +int transCommit() { + //char buffer[30]; unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; - int trecvcount; char treplyretry; /* keeps track of the common response that needs to be sent */ int firsttime=1; trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */ char finalResponse; +#ifdef SANDBOX + abortenabled=0; +#endif + struct writestruct writebuffer; + writebuffer.offset=0; + +#ifdef LOGEVENTS + int iii; + for(iii=0;iiiabort) { + if (t_abort) { //abort this transaction - printf("ABORTING TRANSACTION AT COMMIT\n"); - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + removetransactionhash(); + objstrDelete(t_cache); + t_chashDelete(); return 1; } #endif + int treplyretryCount = 0; + /* Initialize timeout for exponential delay */ + exponential_backoff.tv_sec = 0; + exponential_backoff.tv_nsec = (long)(10000);//10 microsec + count_exponential_backoff = 0; do { - trecvcount = 0; treplyretry = 0; /* Look through all the objects in the transaction record and make piles * for each machine involved in the transaction*/ if (firsttime) { - pile_ptr = pile = createPiles(record); + pile_ptr = pile = createPiles(); pile_ptr = pile = sortPiles(pile); } else { pile = pile_ptr; @@ -608,12 +1000,12 @@ int transCommit(transrecord_t *record) { /* Create a socket and getReplyCtrl array, initialize */ int socklist[pilecount]; + char getReplyCtrl[pilecount]; int loopcount; - for(loopcount = 0 ; loopcount < pilecount; loopcount++) + for(loopcount = 0 ; loopcount < pilecount; loopcount++){ socklist[loopcount] = 0; - char getReplyCtrl[pilecount]; - for(loopcount = 0 ; loopcount < pilecount; loopcount++) getReplyCtrl[loopcount] = 0; + } /* Process each machine pile */ int sockindex = 0; @@ -640,18 +1032,18 @@ int transCommit(transrecord_t *record) { } socklist[sockindex] = sd; /* Send bytes of data with TRANS_REQUEST control message */ - send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t)); + send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t)); /* Send list of machines involved in the transaction */ { int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount); - send_data(sd, tosend[sockindex].listmid, size); + send_buf(sd, &writebuffer, tosend[sockindex].listmid, size); } /* Send oids and version number tuples for objects that are read */ { int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread); - send_data(sd, tosend[sockindex].objread, size); + send_buf(sd, &writebuffer, tosend[sockindex].objread, size); } /* Send objects that are modified */ @@ -667,7 +1059,7 @@ int transCommit(transrecord_t *record) { for(i = 0; i < tosend[sockindex].f.nummod ; i++) { int size; objheader_t *headeraddr; - if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) { + if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) { printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__); free(modptr); free(listmid); @@ -679,14 +1071,15 @@ int transCommit(transrecord_t *record) { memcpy(modptr+offset, headeraddr, size); offset+=size; } - send_data(sd, modptr, tosend[sockindex].f.sum_bytes); + forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes); free(modptr); } else { //handle request locally - handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]); + handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]); } sockindex++; pile = pile->next; } //end of pile processing + /* Recv Ctrl msgs from all machines */ int i; for(i = 0; i < pilecount; i++) { @@ -723,13 +1116,8 @@ int transCommit(transrecord_t *record) { GETSIZE(size, header); size += sizeof(objheader_t); //make an entry in prefetch hash table - void *oldptr; - if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { - prehashRemove(oidToPrefetch); - prehashInsert(oidToPrefetch, header); - } else { - prehashInsert(oidToPrefetch, header); - } + prehashInsert(oidToPrefetch, header); + LOGEVENT('E'); length = length - size; offset += size; } @@ -737,14 +1125,27 @@ int transCommit(transrecord_t *record) { #endif } } + /* Decide the final response */ - if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) { + if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); return 1; } +#ifdef CACHE + if (finalResponse == TRANS_COMMIT) { + /* Invalidate objects in other machine cache */ + int retval; + if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + free(tosend); + free(listmid); + return 1; + } + } +#endif /* Send responses to all machines */ for(i = 0; i < pilecount; i++) { int sd = socklist[i]; @@ -753,46 +1154,35 @@ int transCommit(transrecord_t *record) { if(finalResponse == TRANS_COMMIT) { int retval; /* Update prefetch cache */ - if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) { + if((retval = updatePrefetchCache(&(tosend[i]))) != 0) { printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); free(tosend); free(listmid); return 1; } - - - /* Invalidate objects in other machine cache */ - if(tosend[i].f.nummod > 0) { - if((retval = invalidateObj(&(tosend[i]))) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - free(tosend); - free(listmid); - return 1; - } - } #ifdef ABORTREADERS removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread, tosend[i].f.numread, record); + removethisreadtransaction(tosend[i].objread, tosend[i].f.numread); #endif } #ifdef ABORTREADERS else if (!treplyretry) { - removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record); + removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); } #endif #endif send_data(sd, &finalResponse, sizeof(char)); } else { /* Complete local processing */ - doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record); + doLocalProcess(finalResponse, &(tosend[i]), &transinfo); #ifdef ABORTREADERS if(finalResponse == TRANS_COMMIT) { removetransaction(tosend[i].oidmod,tosend[i].f.nummod); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread, record); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); } else if (!treplyretry) { - removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record); - removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record); + removethistransaction(tosend[i].oidmod,tosend[i].f.nummod); + removethisreadtransaction(tosend[i].objread,tosend[i].f.numread); } #endif } @@ -805,6 +1195,10 @@ int transCommit(transrecord_t *record) { pDelete(pile_ptr); /* wait a random amount of time before retrying to commit transaction*/ if(treplyretry) { + treplyretryCount++; + // if(treplyretryCount >= NUM_TRY_TO_COMMIT) + // exponentialdelay(); + // else randomdelay(); #ifdef TRANSSTATS nSoftAbort++; @@ -814,23 +1208,25 @@ int transCommit(transrecord_t *record) { } while (treplyretry); if(finalResponse == TRANS_ABORT) { - //printf("Aborting trans\n"); #ifdef TRANSSTATS + LOGEVENT('A'); numTransAbort++; #endif /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + objstrDelete(t_cache); + t_chashDelete(); +#ifdef SANDBOX + abortenabled=1; +#endif return TRANS_ABORT; } else if(finalResponse == TRANS_COMMIT) { #ifdef TRANSSTATS + LOGEVENT('C'); numTransCommit++; #endif /* Free Resources */ - objstrDelete(record->cache); - chashDelete(record->lookupTable); - free(record); + objstrDelete(t_cache); + t_chashDelete(); return 0; } else { //TODO Add other cases @@ -843,7 +1239,7 @@ int transCommit(transrecord_t *record) { /* This function handles the local objects involved in a transaction * commiting process. It also makes a decision if this local machine * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */ -void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) { +void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) { unsigned int *oidnotfound = NULL, *oidlocked = NULL; int numoidnotfound = 0, numoidlocked = 0; int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; @@ -870,7 +1266,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra } int tmpsize; objheader_t *headptr; - headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]); + headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]); if (headptr == NULL) { printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__); return; @@ -888,7 +1284,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra transinfo->modptr = NULL; transinfo->numlocked = numoidlocked; transinfo->numnotfound = numoidnotfound; - + /* Condition to send TRANS_AGREE */ if(v_matchnolock == tdata->f.numread + tdata->f.nummod) { *getReplyCtrl = TRANS_AGREE; @@ -899,7 +1295,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra } } -void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) { +void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) { if(finalResponse == TRANS_ABORT) { if(transAbortProcess(transinfo) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); @@ -907,17 +1303,7 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da return; } } else if(finalResponse == TRANS_COMMIT) { -#ifdef CACHE - /* Invalidate objects in other machine cache */ - if(tdata->f.nummod > 0) { - int retval; - if((retval = invalidateObj(tdata)) != 0) { - printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); - return; - } - } -#endif - if(transComProcess(tdata, transinfo, record) != 0) { + if(transComProcess(tdata, transinfo) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); fflush(stdout); return; @@ -937,7 +1323,7 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da /* This function decides the reponse that needs to be sent to * all Participant machines after the TRANS_REQUEST protocol */ -char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) { +char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) { int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ for (i = 0 ; i < pilecount; i++) { @@ -945,7 +1331,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record control = getReplyCtrl[i]; switch(control) { default: - printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__); /* treat as disagree, pass thru */ case TRANS_DISAGREE: @@ -968,7 +1354,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record return TRANS_ABORT; #ifdef CACHE /* clear objects from prefetch cache */ - cleanPCache(record); + //cleanPCache(); #endif } else if(transagree == pilecount) { /* Send Commit */ @@ -987,7 +1373,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record * available and copies the object and its header to the local * cache. */ -void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { +void *getRemoteObj(unsigned int mnum, unsigned int oid) { int size, val; struct sockaddr_in serv_addr; char machineip[16]; @@ -1009,11 +1395,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { } else { /* Read object if found into local cache */ recv_data(sd, &size, sizeof(int)); - objcopy = objstrAlloc(&record->cache, size); + objcopy = objstrAlloc(&t_cache, size); recv_data(sd, objcopy, size); STATUS(objcopy)=0; /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, oid, objcopy); + t_chashInsert(oid, objcopy); +#ifdef TRANSSTATS + totalObjSize += size; +#endif } return objcopy; @@ -1043,7 +1432,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne //Keep track of what is locked oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj)); - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } else { //A lock is acquired some place else @@ -1053,7 +1441,6 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ *getReplyCtrl = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } @@ -1083,7 +1470,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign *getReplyCtrl = TRANS_DISAGREE; //Keep track of what is locked oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj)); - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } else { //Has reached max number of readers or some other transaction @@ -1094,7 +1480,6 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign (*v_nomatch)++; /* Send TRANS_DISAGREE to Coordinator */ *getReplyCtrl = TRANS_DISAGREE; - //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj)); return; } } @@ -1131,7 +1516,7 @@ int transAbortProcess(trans_commit_data_t *transinfo) { } /*This function completes the COMMIT process if the transaction is commiting*/ -int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) { +int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) { objheader_t *header, *tcptr; int i, nummod, tmpsize, numcreated, numlocked; unsigned int *oidmod, *oidcreated, *oidlocked; @@ -1150,7 +1535,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra return 1; } /* Copy from transaction cache -> main object store */ - if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) { + if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) { printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); return 1; } @@ -1172,7 +1557,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra } /* If object is newly created inside transaction then commit it */ for (i = 0; i < numcreated; i++) { - if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) { + if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) { printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); return 1; } @@ -1211,55 +1596,79 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra return 0; } -prefetchpile_t *foundLocal(char *ptr) { - int siteid = *(GET_SITEID(ptr)); - int ntuples = *(GET_NTUPLES(ptr)); - unsigned int * oidarray = GET_PTR_OID(ptr); - unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); - short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples); +prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) { + int i; + int j; prefetchpile_t * head=NULL; - int numLocal = 0; - int i; - for(i=0; i 1) { + return 0; + } + } } else { return 0; } - if(TYPE(header) > NUMCLASSES) { + if(TYPE(header) >= NUMCLASSES) { int elementsize = classsize[TYPE(header)]; struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); int length = ao->___length___; @@ -1305,7 +1720,8 @@ void *transPrefetch(void *t) { void *node=gettail(); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ - prefetchpile_t *pilehead = foundLocal(node); + int count=numavailable(); + prefetchpile_t *pilehead = foundLocal(node, count, 0); if (pilehead!=NULL) { // Get sock from shared pool @@ -1313,9 +1729,10 @@ void *transPrefetch(void *t) { /* Send Prefetch Request */ prefetchpile_t *ptr = pilehead; while(ptr != NULL) { - int sd = getSock2(transPrefetchSockPool, ptr->mid); - sendPrefetchReq(ptr, sd); - ptr = ptr->next; + globalid++; + int sd = getSock2(transPrefetchSockPool, ptr->mid); + sendPrefetchReq(ptr, sd,globalid); + ptr = ptr->next; } /* Release socket */ @@ -1325,7 +1742,7 @@ void *transPrefetch(void *t) { mcdealloc(pilehead); } // Deallocate the prefetch queue pile node - inctail(); + incmulttail(count); } } @@ -1358,54 +1775,83 @@ void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) { return; } -void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { +/** + * parameters: mcpilenode -> pile node to traverse to assemble pref requests + * sd -> socket id + * gid -> global identifier for each prefetch request sent, starts with 0 + **/ +void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) { int len, endpair; char control; objpile_t *tmp; + struct writestruct writebuffer; + writebuffer.offset=0; + /* Send TRANS_PREFETCH control message */ - control = TRANS_PREFETCH; - send_data(sd, &control, sizeof(char)); + int first=1; /* Send Oids and offsets in pairs */ tmp = mcpilenode->objpiles; while(tmp != NULL) { - len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); - char oidnoffset[len]; + len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + char oidnoffset[len+5]; char *buf=oidnoffset; + if (first) { + *buf=TRANS_PREFETCH; + buf++;len++; + first=0; + } *((int*)buf) = tmp->numoffset; buf+=sizeof(int); *((unsigned int *)buf) = tmp->oid; + LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc()); +#ifdef TRANSSTATS + sendRemoteReq++; +#endif buf+=sizeof(unsigned int); *((unsigned int *)buf) = myIpAddr; - buf += sizeof(unsigned int); + buf+= sizeof(unsigned int); + *((int*)buf) = gid; + buf+=sizeof(int); memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short)); - send_data(sd, oidnoffset, len); tmp = tmp->next; + if (tmp==NULL) { + *((int *)(&oidnoffset[len]))=-1; + len+=sizeof(int); + } + if (tmp!=NULL) + send_buf(sd, &writebuffer, oidnoffset, len); + else + forcesend_buf(sd, &writebuffer, oidnoffset, len); } - - /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ - endpair = -1; - send_data(sd, &endpair, sizeof(int)); - + LOGOIDTYPE("SREQ",0,0,myrdtsc()); + LOGEVENT('S'); + LOGTIME('S',0,0,myrdtsc(),gid); //after sending return; } -int getPrefetchResponse(int sd) { - int length = 0, size = 0; +int getPrefetchResponse(int sd, struct readstruct *readbuffer) { + int gid,length = 0, size = 0; char control; unsigned int oid; void *modptr, *oldptr; - recv_data((int)sd, &length, sizeof(int)); + recv_data_buf(sd, readbuffer, &length, sizeof(int)); size = length - sizeof(int); char recvbuffer[size]; - - recv_data((int)sd, recvbuffer, size); +#ifdef TRANSSTATS + getResponse++; + LOGEVENT('Z'); + LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv +#endif + recv_data_buf(sd, readbuffer, recvbuffer, size); control = *((char *) recvbuffer); if(control == OBJECT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); - size = size - (sizeof(char) + sizeof(unsigned int)); + gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int))); + LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv + size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int)); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = prefetchobjstrAlloc(size)) == NULL) { printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); @@ -1413,28 +1859,27 @@ int getPrefetchResponse(int sd) { return -1; } pthread_mutex_unlock(&prefetchcache_mutex); - memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); + memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size); STATUS(modptr)=0; + /* Insert the oid and its address into the prefetch hash lookup table */ /* Do a version comparison if the oid exists */ if((oldptr = prehashSearch(oid)) != NULL) { /* If older version then update with new object ptr */ - if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { - prehashRemove(oid); - prehashInsert(oid, modptr); + if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) { + prehashInsert(oid, modptr); } } else { /* Else add the object ptr to hash table*/ prehashInsert(oid, modptr); } - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); - /* Broadcast signal on prefetch cache condition variable */ - pthread_cond_broadcast(&pflookup.cond); - /* Unlock the Prefetch Cache look up table*/ - pthread_mutex_unlock(&pflookup.lock); + LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc()); + LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache } else if(control == OBJECT_NOT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); + gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int))); + LOGOIDTYPE("NF",oid,0,myrdtsc()); + LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache /* TODO: For each object not found query DHT for new location and retrieve the object */ /* Throw an error */ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); @@ -1509,7 +1954,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid) { int sock; struct sockaddr_in remoteAddr; char msg[1 + sizeof(unsigned int)]; - int bytesSent; + //int bytesSent; int status; if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { @@ -1646,7 +2091,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n struct sockaddr_in remoteAddr; char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)]; char *ptr; - int bytesSent; + //int bytesSent; int status, size; unsigned short version; unsigned int oid,mid; @@ -1776,7 +2221,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { unsigned int mid; struct sockaddr_in remoteAddr; char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; - int sock, status, size, bytesSent; + int sock, status, size; + //int bytesSent; while(*head != NULL) { ptr = *head; @@ -1818,13 +2264,12 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { return status; } -void transAbort(transrecord_t *trans) { +void transAbort() { #ifdef ABORTREADERS - removetransactionhash(trans->lookupTable, trans); + removetransactionhash(); #endif - objstrDelete(trans->cache); - chashDelete(trans->lookupTable); - free(trans); + objstrDelete(t_cache); + t_chashDelete(); } /* This function inserts necessary information into