#include "dstm.h"
+#include "debugmacro.h"
#include "ip.h"
#include "machinepile.h"
-#include "mlookup.h"
+#include "altmlookup.h"
#include "llookup.h"
#include "plookup.h"
-#include "prelookup.h"
+#include "altprelookup.h"
#include "threadnotify.h"
#include "queue.h"
#include "addUdpEnhance.h"
#ifdef COMPILER
#include "thread.h"
#endif
+#ifdef ABORTREADERS
+#include "abortreaders.h"
+#endif
+#include "trans.h"
#define NUM_THREADS 1
-#define PREFETCH_CACHE_SIZE 1048576 //1MB
#define CONFIG_FILENAME "dstm.conf"
+//#define LOGEVENTS //turn on Logging events
+#ifdef LOGEVENTS
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+ int tmp=bigindex++; \
+ bigarray[tmp]=x; \
+ }
+#else
+#define LOGEVENT(x)
+#endif
+
+//#define LOGTIMES
+#ifdef LOGTIMES
+char bigarray1[6*1024*1024];
+unsigned int bigarray2[6*1024*1024];
+unsigned int bigarray3[6*1024*1024];
+long long bigarray4[6*1024*1024];
+int bigarray5[6*1024*1024];
+int bigindex1=0;
+#define LOGTIME(x,y,z,a,b) {\
+ int tmp=bigindex1; \
+ bigarray1[tmp]=x; \
+ bigarray2[tmp]=y; \
+ bigarray3[tmp]=z; \
+ bigarray4[tmp]=a; \
+ bigarray5[tmp]=b; \
+ bigindex1++; \
+}
+#else
+#define LOGTIME(x,y,z,a,b)
+#endif
+
+/* Thread transaction variables */
+__thread objstr_t *t_cache;
+__thread struct ___Object___ *revertlist;
+__thread struct timespec exponential_backoff;
+__thread int count_exponential_backoff;
+__thread const int max_exponential_backoff = 1000; // safety limit
+#ifdef SANDBOX
+__thread int trans_allocation_bytes;
+#endif
+
+
+#ifdef ABORTREADERS
+__thread int t_abort;
+__thread jmp_buf aborttrans;
+#endif
+
+int globalid=0; /* This variable is a unique global identifier for a sendPrefetch request */
/* Global Variables */
extern int classsize[];
pfcstats_t *evalPrefetch;
extern int numprefetchsites; //Global variable containing number of prefetch sites
extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
-objstr_t *prefetchcache; //Global Prefetch cache
pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
unsigned int oidsPerBlock;
unsigned int oidMin;
unsigned int oidMax;
-
sockPoolHashTable_t *transReadSockPool;
sockPoolHashTable_t *transPrefetchSockPool;
sockPoolHashTable_t *transRequestSockPool;
int nchashSearch = 0;
int nmhashSearch = 0;
int nprehashSearch = 0;
+int ndirtyCacheObj = 0;
int nRemoteSend = 0;
int nSoftAbort = 0;
int bytesSent = 0;
int bytesRecv = 0;
+int totalObjSize = 0;
+int sendRemoteReq = 0;
+int getResponse = 0;
void printhex(unsigned char *, int);
-plistnode_t *createPiles(transrecord_t *);
+plistnode_t *createPiles();
plistnode_t *sortPiles(plistnode_t *pileptr);
/*******************************
}
}
+void send_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
+ if (buflen+sendbuffer->offset>WMAXBUF) {
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+ send_data(fd, buffer, buflen);
+ return;
+ }
+ memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
+ sendbuffer->offset+=buflen;
+ if (sendbuffer->offset>WTOP) {
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+ }
+}
+
+void forcesend_buf(int fd, struct writestruct * sendbuffer, void *buffer, int buflen) {
+ if (buflen+sendbuffer->offset>WMAXBUF) {
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+ send_data(fd, buffer, buflen);
+ return;
+ }
+ memcpy(&sendbuffer->buf[sendbuffer->offset], buffer, buflen);
+ sendbuffer->offset+=buflen;
+ send_data(fd, sendbuffer->buf, sendbuffer->offset);
+ sendbuffer->offset=0;
+}
+
+int recvw(int fd, void *buf, int len, int flags) {
+ return recv(fd, buf, len, flags);
+}
+
+void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+ char *buf=(char *)buffer;
+ int numbytes=readbuffer->head-readbuffer->tail;
+ if (numbytes>buflen)
+ numbytes=buflen;
+ if (numbytes>0) {
+ memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+ readbuffer->tail+=numbytes;
+ buflen-=numbytes;
+ buf+=numbytes;
+ }
+ if (buflen==0) {
+ return;
+ }
+ if (buflen>=MAXBUF) {
+ recv_data(fd, buf, buflen);
+ return;
+ }
+
+ int maxbuf=MAXBUF;
+ int obufflen=buflen;
+ readbuffer->head=0;
+
+ while (buflen > 0) {
+ int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+ if (numbytes == -1) {
+ perror("recv");
+ exit(0);
+ }
+ bytesRecv+=numbytes;
+ buflen-=numbytes;
+ readbuffer->head+=numbytes;
+ maxbuf-=numbytes;
+ }
+ memcpy(buf,readbuffer->buf,obufflen);
+ readbuffer->tail=obufflen;
+}
+
+int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+ char *buf=(char *)buffer;
+ //now tail<=head
+ int numbytes=readbuffer->head-readbuffer->tail;
+ if (numbytes>buflen)
+ numbytes=buflen;
+ if (numbytes>0) {
+ memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+ readbuffer->tail+=numbytes;
+ buflen-=numbytes;
+ buf+=numbytes;
+ }
+ if (buflen==0)
+ return 1;
+
+ if (buflen>=MAXBUF) {
+ return recv_data_errorcode(fd, buf, buflen);
+ }
+
+ int maxbuf=MAXBUF;
+ int obufflen=buflen;
+ readbuffer->head=0;
+
+ while (buflen > 0) {
+ int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+ if (numbytes ==0) {
+ return 0;
+ }
+ if (numbytes==-1) {
+ perror("recvbuf");
+ return -1;
+ }
+ bytesRecv+=numbytes;
+ buflen-=numbytes;
+ readbuffer->head+=numbytes;
+ maxbuf-=numbytes;
+ }
+ memcpy(buf,readbuffer->buf,obufflen);
+ readbuffer->tail=obufflen;
+ return 1;
+}
+
+
void recv_data(int fd, void *buf, int buflen) {
char *buffer = (char *)(buf);
int size = buflen;
int numbytes;
while (size > 0) {
- numbytes = recv(fd, buffer, size, 0);
+ numbytes = recvw(fd, buffer, size, 0);
bytesRecv = bytesRecv + numbytes;
if (numbytes == -1) {
perror("recv");
int size = buflen;
int numbytes;
while (size > 0) {
- numbytes = recv(fd, buffer, size, 0);
+ numbytes = recvw(fd, buffer, size, 0);
if (numbytes==0)
return 0;
if (numbytes == -1) {
perror("recv");
return -1;
}
+ bytesRecv+=numbytes;
buffer += numbytes;
size -= numbytes;
}
return max;
}
+#define INLINEPREFETCH
+#define PREFTHRESHOLD 0
+
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
/* Allocate for the queue node*/
int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
int len;
- char * node= getmemory(qnodesize);
+#ifdef INLINEPREFETCH
+ int attempted=0;
+ char *node;
+ do {
+ node=getmemory(qnodesize);
+ if (node==NULL&&attempted)
+ break;
+ if (node!=NULL) {
+#else
+ char *node=getmemory(qnodesize);
+#endif
int top=endoffsets[ntuples-1];
- if (node==NULL)
+ if (node==NULL) {
+ LOGEVENT('D');
return;
+ }
/* Set queue node values */
/* TODO: Remove this after testing */
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
+#ifdef INLINEPREFETCH
+ movehead(qnodesize);
+ }
+ int numpref=numavailable();
+ attempted=1;
+
+ if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
+ node=gettail();
+ prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ globalid++;
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd, globalid);
+ ptr = ptr->next;
+ }
+
+ mcdealloc(pilehead);
+ }
+ resetqueue();
+ }//end do prefetch if condition
+ } while(node==NULL);
+#else
/* Lock and insert into primary prefetch queue */
movehead(qnodesize);
+#endif
}
/* This function starts up the transaction runtime. */
printf("Trans stats is on\n");
fflush(stdout);
#endif
+#ifdef ABORTREADERS
+ initreaderlist();
+#endif
//Initialize socket pool
transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
void transInit() {
//Create and initialize prefetch cache structure
#ifdef CACHE
- prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
initializePCache();
if((evalPrefetch = initPrefetchStats()) == NULL) {
printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
pthread_mutex_init(&atomicObjLock, NULL);
#ifdef CACHE
//Create prefetch cache lookup table
- if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
+ if(prehashCreate(PHASH_SIZE, PLOADFACTOR)) {
printf("ERROR\n");
return; //Failure
}
retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
} while(retval!=0);
#else
+#ifndef INLINEPREFETCH
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
#endif
+#endif
+#ifndef INLINEPREFETCH
pthread_detach(tPrefetch);
#endif
+#endif
}
/* This function stops the threads spawned */
return;
}
-/* This function initializes things required in the transaction start*/
-transrecord_t *transStart() {
- transrecord_t *tmp;
- if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
- printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
- return NULL;
+void exponentialdelay() {
+ exponential_backoff.tv_nsec = exponential_backoff.tv_nsec * 2;
+ nanosleep(&exponential_backoff, NULL);
+ ++count_exponential_backoff;
+ if (count_exponential_backoff >= max_exponential_backoff) {
+ printf(" reached max_exponential_backoff at %s, %s(), %d\n", __FILE__, __func__, __LINE__);
+ exit(-1);
}
- tmp->cache = objstrCreate(1048576);
- tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
-#ifdef COMPILER
- tmp->revertlist=NULL;
+ return;
+}
+
+/* This function initializes things required in the transaction start*/
+void transStart() {
+ t_cache = objstrCreate(1048576);
+ t_chashCreate(CHASH_SIZE, CLOADFACTOR);
+ revertlist=NULL;
+#ifdef SANDBOX
+ trans_allocation_bytes = 0;
+#endif
+#ifdef ABORTREADERS
+ t_abort=0;
#endif
- return tmp;
}
// Search for an address for a given oid
}*/
+
+
/* This function finds the location of the objects involved in a transaction
* and returns the pointer to the object if found in a remote location */
-__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) {
+__attribute__((pure)) objheader_t *transRead(unsigned int oid) {
unsigned int machinenumber;
objheader_t *tmp, *objheader;
objheader_t *objcopy;
int size;
void *buf;
chashlistnode_t *node;
- chashtable_t *table=record->lookupTable;
if(oid == 0) {
return NULL;
}
- node= &table->table[(oid & table->mask)>>1];
+
+ node= &c_table[(oid & c_mask)>>1];
do {
if(node->key == oid) {
#ifdef TRANSSTATS
*/
#ifdef ABORTREADERS
- if (trans->abort) {
+ if (t_abort) {
//abort this transaction
- longjmp(trans->aborttrans,1);
+ removetransactionhash();
+ objstrDelete(t_cache);
+ t_chashDelete();
+ _longjmp(aborttrans,1);
} else
- addtransaction(oid,record);
+ addtransaction(oid);
#endif
if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
GETSIZE(size, objheader);
size += sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
memcpy(objcopy, objheader, size);
/* Insert into cache's lookup table */
STATUS(objcopy)=0;
- chashInsert(record->lookupTable, OID(objheader), objcopy);
+ t_chashInsert(OID(objheader), objcopy);
#ifdef COMPILER
return &objcopy[1];
#else
} else {
#ifdef CACHE
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+ ndirtyCacheObj++;
+#endif
+ goto remoteread;
+ }
#ifdef TRANSSTATS
nprehashSearch++;
#endif
/* Look up in prefetch cache */
GETSIZE(size, tmp);
size+=sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
memcpy(objcopy, tmp, size);
/* Insert into cache's lookup table */
- chashInsert(record->lookupTable, OID(tmp), objcopy);
+ t_chashInsert(OID(tmp), objcopy);
#ifdef COMPILER
return &objcopy[1];
#else
return objcopy;
#endif
}
+remoteread:
#endif
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
return NULL;
}
- objcopy = getRemoteObj(record, machinenumber, oid);
+ objcopy = getRemoteObj(machinenumber, oid);
if(objcopy == NULL) {
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
#ifdef TRANSSTATS
nRemoteSend++;
#endif
- STATUS(objcopy)=0;
#ifdef COMPILER
+#ifdef CACHE
+ //Copy object to prefetch cache
+ pthread_mutex_lock(&prefetchcache_mutex);
+ objheader_t *headerObj;
+ int size;
+ GETSIZE(size, objcopy);
+ if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
+ printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+ __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return NULL;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+ //make an entry in prefetch lookup hashtable
+ prehashInsert(oid, headerObj);
+ LOGEVENT('B');
+#endif
+ return &objcopy[1];
+#else
+ return objcopy;
+#endif
+ }
+ }
+}
+
+
+/* This function finds the location of the objects involved in a transaction
+ * and returns the pointer to the object if found in a remote location */
+__attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
+//DEBUG: __attribute__((pure)) objheader_t *transRead2(unsigned int oid, char tmpptr[]) {
+ unsigned int machinenumber;
+ objheader_t *tmp, *objheader;
+ objheader_t *objcopy;
+ int size;
+
+#ifdef ABORTREADERS
+ if (t_abort) {
+ //abort this transaction
+ removetransactionhash();
+ objstrDelete(t_cache);
+ t_chashDelete();
+ _longjmp(aborttrans,1);
+ } else
+ addtransaction(oid);
+#endif
+
+ if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+ nmhashSearch++;
+#endif
+ /* Look up in machine lookup table and copy into cache*/
+ GETSIZE(size, objheader);
+ size += sizeof(objheader_t);
+ objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+ memcpy(objcopy, objheader, size);
+ /* Insert into cache's lookup table */
+ STATUS(objcopy)=0;
+ t_chashInsert(OID(objheader), objcopy);
+#ifdef COMPILER
+ return &objcopy[1];
+#else
+ return objcopy;
+#endif
+ } else {
+#ifdef CACHE
+ if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+ if(STATUS(tmp) & DIRTY) {
+#ifdef TRANSSTATS
+ ndirtyCacheObj++;
+#endif
+ goto remoteread;
+ }
+#ifdef TRANSSTATS
+ LOGEVENT('P')
+ nprehashSearch++;
+#endif
+ /* Look up in prefetch cache */
+ GETSIZE(size, tmp);
+ size+=sizeof(objheader_t);
+ objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+ memcpy(objcopy, tmp, size);
+ LOGOIDTYPE("P",oid, TYPE(objcopy), myrdtsc());
+ /* Insert into cache's lookup table */
+ t_chashInsert(OID(tmp), objcopy);
+#ifdef COMPILER
+ return &objcopy[1];
+#else
+ return objcopy;
+#endif
+ }
+remoteread:
+#endif
+ /* Get the object from the remote location */
+ if((machinenumber = lhashSearch(oid)) == 0) {
+ printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+ return NULL;
+ }
+ objcopy = getRemoteObj(machinenumber, oid);
+#ifdef TRANSSTATS
+ LOGEVENT('R');
+ nRemoteSend++;
+#endif
+
+ if(objcopy == NULL) {
+ printf("Error: Object %u not found in Remote location %s, %d\n", oid,__FILE__, __LINE__);
+ return NULL;
+ } else {
+#ifdef COMPILER
+#ifdef CACHE
+ LOGOIDTYPE("RR",oid, TYPE(objcopy),myrdtsc());
+ LOGTIME('r', oid, TYPE(objcopy),myrdtsc(),0);
+ //Copy object to prefetch cache
+ pthread_mutex_lock(&prefetchcache_mutex);
+ objheader_t *headerObj;
+ int size;
+ GETSIZE(size, objcopy);
+ if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
+ printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+ __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return NULL;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(headerObj, objcopy, size+sizeof(objheader_t));
+ //make an entry in prefetch lookup hashtable
+ prehashInsert(oid, headerObj);
+ LOGEVENT('B');
+#endif
return &objcopy[1];
#else
return objcopy;
}
/* This function creates objects in the transaction record */
-objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
- objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
+objheader_t *transCreateObj(unsigned int size) {
+ objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
OID(tmp) = getNewOID();
tmp->version = 1;
tmp->rcount = 1;
STATUS(tmp) = NEW;
- chashInsert(record->lookupTable, OID(tmp), tmp);
+ t_chashInsert(OID(tmp), tmp);
+#ifdef SANDBOX
+ trans_allocation_bytes += size;
+ /* Validate the read set if allocation is exceeds threshold */
+ if(trans_allocation_bytes > MEM_ALLOC_THRESHOLD) {
+ check_mem_alloc();
+ }
+#endif
#ifdef COMPILER
return &tmp[1]; //want space after object header
#if 1
/* This function creates machine piles based on all machines involved in a
* transaction commit request */
-plistnode_t *createPiles(transrecord_t *record) {
+plistnode_t *createPiles() {
int i;
plistnode_t *pile = NULL;
unsigned int machinenum;
objheader_t *headeraddr;
- chashlistnode_t * ptr = record->lookupTable->table;
+ chashlistnode_t * ptr = c_table;
/* Represents number of bins in the chash table */
- unsigned int size = record->lookupTable->size;
+ unsigned int size = c_size;
for(i = 0; i < size ; i++) {
chashlistnode_t * curr = &ptr[i];
}
//Make machine groups
- pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
+ pile = pInsert(pile, headeraddr, machinenum, c_numelements);
curr = curr->next;
}
}
#else
/* This function creates machine piles based on all machines involved in a
* transaction commit request */
-plistnode_t *createPiles(transrecord_t *record) {
+plistnode_t *createPiles() {
int i;
plistnode_t *pile = NULL;
unsigned int machinenum;
objheader_t *headeraddr;
- struct chashentry * ptr = record->lookupTable->table;
+ struct chashentry * ptr = c_table;
/* Represents number of bins in the chash table */
- unsigned int size = record->lookupTable->size;
+ unsigned int size = c_size;
for(i = 0; i < size ; i++) {
struct chashentry * curr = & ptr[i];
}
//Make machine groups
- pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
+ pile = pInsert(pile, headeraddr, machinenum, c_numelements);
}
return pile;
}
* and creates new piles by calling the createPiles(),
* Sends a transrequest() to each remote machines for objects found remotely
* and calls handleLocalReq() to process objects found locally */
-int transCommit(transrecord_t *record) {
+int transCommit() {
+ //char buffer[30];
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
- int trecvcount;
char treplyretry; /* keeps track of the common response that needs to be sent */
int firsttime=1;
trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
char finalResponse;
+#ifdef SANDBOX
+ abortenabled=0;
+#endif
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
+
+#ifdef LOGEVENTS
+ int iii;
+ for(iii=0;iii<bigindex;iii++) {
+ printf("%c", bigarray[iii]);
+ }
+#endif
+
+#ifdef LOGTIMES
+ int jjj;
+ for(jjj=0; jjj<bigindex1; jjj++) {
+ printf("[%c %u %u %lld %d]\n", bigarray1[jjj], bigarray2[jjj], bigarray3[jjj], bigarray4[jjj], bigarray5[jjj]);
+ }
+#endif
+
+#ifdef ABORTREADERS
+ if (t_abort) {
+ //abort this transaction
+ removetransactionhash();
+ objstrDelete(t_cache);
+ t_chashDelete();
+ return 1;
+ }
+#endif
+
+ int treplyretryCount = 0;
+ /* Initialize timeout for exponential delay */
+ exponential_backoff.tv_sec = 0;
+ exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+ count_exponential_backoff = 0;
do {
- trecvcount = 0;
treplyretry = 0;
/* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
if (firsttime) {
- pile_ptr = pile = createPiles(record);
+ pile_ptr = pile = createPiles();
pile_ptr = pile = sortPiles(pile);
} else {
pile = pile_ptr;
/* Create a socket and getReplyCtrl array, initialize */
int socklist[pilecount];
+ char getReplyCtrl[pilecount];
int loopcount;
- for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++){
socklist[loopcount] = 0;
- char getReplyCtrl[pilecount];
- for(loopcount = 0 ; loopcount < pilecount; loopcount++)
getReplyCtrl[loopcount] = 0;
+ }
/* Process each machine pile */
int sockindex = 0;
}
socklist[sockindex] = sd;
/* Send bytes of data with TRANS_REQUEST control message */
- send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+ send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
- send_data(sd, tosend[sockindex].listmid, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
}
/* Send oids and version number tuples for objects that are read */
{
int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
- send_data(sd, tosend[sockindex].objread, size);
+ send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
}
/* Send objects that are modified */
for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
int size;
objheader_t *headeraddr;
- if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) {
+ if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
free(modptr);
free(listmid);
memcpy(modptr+offset, headeraddr, size);
offset+=size;
}
- send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
free(modptr);
} else { //handle request locally
- handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]);
+ handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
}
sockindex++;
pile = pile->next;
} //end of pile processing
+
/* Recv Ctrl msgs from all machines */
int i;
for(i = 0; i < pilecount; i++) {
objheader_t * header;
header = (objheader_t *)(((char *)newAddr) + offset);
oidToPrefetch = OID(header);
+ STATUS(header)=0;
int size = 0;
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
- prehashRemove(oidToPrefetch);
- prehashInsert(oidToPrefetch, header);
- } else {
- prehashInsert(oidToPrefetch, header);
- }
+ prehashInsert(oidToPrefetch, header);
+ LOGEVENT('E');
length = length - size;
offset += size;
}
#endif
}
}
+
/* Decide the final response */
- if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) {
+ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(tosend);
free(listmid);
return 1;
}
+#ifdef CACHE
+ if (finalResponse == TRANS_COMMIT) {
+ /* Invalidate objects in other machine cache */
+ int retval;
+ if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ }
+#endif
/* Send responses to all machines */
for(i = 0; i < pilecount; i++) {
int sd = socklist[i];
if(finalResponse == TRANS_COMMIT) {
int retval;
/* Update prefetch cache */
- if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) {
+ if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
free(tosend);
free(listmid);
return 1;
}
- /* Invalidate objects in other machine cache */
- if(tosend[i].f.nummod > 0) {
- if((retval = invalidateObj(&(tosend[i]))) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- free(tosend);
- free(listmid);
- return 1;
- }
- }
+#ifdef ABORTREADERS
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
+#endif
+ }
+#ifdef ABORTREADERS
+ else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
}
+#endif
#endif
send_data(sd, &finalResponse, sizeof(char));
} else {
/* Complete local processing */
- doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
+ doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+#ifdef ABORTREADERS
+ if(finalResponse == TRANS_COMMIT) {
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ } else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+ }
+#endif
}
}
pDelete(pile_ptr);
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
+ treplyretryCount++;
+ // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+ // exponentialdelay();
+ // else
randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
} while (treplyretry);
if(finalResponse == TRANS_ABORT) {
- //printf("Aborting trans\n");
#ifdef TRANSSTATS
+ LOGEVENT('A');
numTransAbort++;
#endif
/* Free Resources */
- objstrDelete(record->cache);
- chashDelete(record->lookupTable);
- free(record);
+ objstrDelete(t_cache);
+ t_chashDelete();
+#ifdef SANDBOX
+ abortenabled=1;
+#endif
return TRANS_ABORT;
} else if(finalResponse == TRANS_COMMIT) {
#ifdef TRANSSTATS
+ LOGEVENT('C');
numTransCommit++;
#endif
/* Free Resources */
- objstrDelete(record->cache);
- chashDelete(record->lookupTable);
- free(record);
+ objstrDelete(t_cache);
+ t_chashDelete();
return 0;
} else {
//TODO Add other cases
/* This function handles the local objects involved in a transaction
* commiting process. It also makes a decision if this local machine
* sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
-void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) {
+void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
unsigned int *oidnotfound = NULL, *oidlocked = NULL;
int numoidnotfound = 0, numoidlocked = 0;
int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
}
int tmpsize;
objheader_t *headptr;
- headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]);
+ headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
if (headptr == NULL) {
printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
return;
transinfo->modptr = NULL;
transinfo->numlocked = numoidlocked;
transinfo->numnotfound = numoidnotfound;
-
+
/* Condition to send TRANS_AGREE */
if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
*getReplyCtrl = TRANS_AGREE;
}
}
-void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) {
+void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
if(finalResponse == TRANS_ABORT) {
if(transAbortProcess(transinfo) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
return;
}
} else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
- /* Invalidate objects in other machine cache */
- if(tdata->f.nummod > 0) {
- int retval;
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
-#endif
- if(transComProcess(tdata, transinfo, record) != 0) {
+ if(transComProcess(tdata, transinfo) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
fflush(stdout);
return;
/* This function decides the reponse that needs to be sent to
* all Participant machines after the TRANS_REQUEST protocol */
-char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) {
+char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
for (i = 0 ; i < pilecount; i++) {
control = getReplyCtrl[i];
switch(control) {
default:
- printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+ printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
/* treat as disagree, pass thru */
case TRANS_DISAGREE:
return TRANS_ABORT;
#ifdef CACHE
/* clear objects from prefetch cache */
- cleanPCache(record);
+ //cleanPCache();
#endif
} else if(transagree == pilecount) {
/* Send Commit */
* available and copies the object and its header to the local
* cache. */
-void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
+void *getRemoteObj(unsigned int mnum, unsigned int oid) {
int size, val;
struct sockaddr_in serv_addr;
char machineip[16];
} else {
/* Read object if found into local cache */
recv_data(sd, &size, sizeof(int));
- objcopy = objstrAlloc(record->cache, size);
+ objcopy = objstrAlloc(&t_cache, size);
recv_data(sd, objcopy, size);
+ STATUS(objcopy)=0;
/* Insert into cache's lookup table */
- chashInsert(record->lookupTable, oid, objcopy);
+ t_chashInsert(oid, objcopy);
+#ifdef TRANSSTATS
+ totalObjSize += size;
+#endif
}
return objcopy;
//Keep track of what is locked
oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //A lock is acquired some place else
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
*getReplyCtrl = TRANS_DISAGREE;
//Keep track of what is locked
oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
} else { //Has reached max number of readers or some other transaction
(*v_nomatch)++;
/* Send TRANS_DISAGREE to Coordinator */
*getReplyCtrl = TRANS_DISAGREE;
- //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
return;
}
}
}
/*This function completes the COMMIT process if the transaction is commiting*/
-int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) {
+int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
objheader_t *header, *tcptr;
int i, nummod, tmpsize, numcreated, numlocked;
unsigned int *oidmod, *oidcreated, *oidlocked;
return 1;
}
/* Copy from transaction cache -> main object store */
- if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) {
+ if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
}
/* If object is newly created inside transaction then commit it */
for (i = 0; i < numcreated; i++) {
- if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) {
+ if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
return 1;
}
GETSIZE(tmpsize, header);
tmpsize += sizeof(objheader_t);
pthread_mutex_lock(&mainobjstore_mutex);
- if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+ if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&mainobjstore_mutex);
return 1;
return 0;
}
-prefetchpile_t *foundLocal(char *ptr) {
- int siteid = *(GET_SITEID(ptr));
- int ntuples = *(GET_NTUPLES(ptr));
- unsigned int * oidarray = GET_PTR_OID(ptr);
- unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
- short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+prefetchpile_t *foundLocal(char *ptr, int numprefetches, int mysiteid) {
+ int i;
+ int j;
prefetchpile_t * head=NULL;
- int numLocal = 0;
- int i;
- for(i=0; i<ntuples; i++) {
- unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
- unsigned short endindex=endoffsets[i];
- unsigned int oid=oidarray[i];
- int newbase;
- int machinenum;
- if (oid==0)
- continue;
- //Look up fields locally
- for(newbase=baseindex; newbase<endindex; newbase++) {
- if (!lookupObject(&oid, arryfields[newbase]))
- break;
- //Ended in a null pointer...
- if (oid==0)
+ for(j=0;j<numprefetches;j++) {
+ int siteid = *(GET_SITEID(ptr));
+ int ntuples = *(GET_NTUPLES(ptr));
+ unsigned int * oidarray = GET_PTR_OID(ptr);
+ unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ int numLocal = 0;
+
+ for(i=0; i<ntuples; i++) {
+ unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
+ unsigned short endindex=endoffsets[i];
+ unsigned int oid=oidarray[i];
+ int newbase;
+ int machinenum;
+ int countInvalidObj=0;
+
+ if (oid==0) {
+ numLocal++;
+ continue;
+ }
+ //Look up fields locally
+ int isLastOffset=0;
+ if(endindex==0)
+ isLastOffset=1;
+ for(newbase=baseindex; newbase<endindex; newbase++) {
+ if(newbase==(endindex-1))
+ isLastOffset=1;
+ if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
+ break;
+ }
+ //Ended in a null pointer...
+ if (oid==0) {
+ numLocal++;
+ goto tuple;
+ }
+ }
+
+ //Entire prefetch is local
+ if (newbase==endindex&&checkoid(oid,isLastOffset)) {
+ numLocal++;
goto tuple;
+ }
+
+ //Add to remote requests
+ machinenum=lhashSearch(oid);
+ insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
+ tuple:
+ ;
}
- //Entire prefetch is local
- if (newbase==endindex&&checkoid(oid)) {
- numLocal++;
- goto tuple;
- }
- //Add to remote requests
- machinenum=lhashSearch(oid);
- insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
-tuple:
- ;
+
+ /* handle dynamic prefetching */
+ handleDynPrefetching(numLocal, ntuples, siteid);
+ ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
}
- /* handle dynamic prefetching */
- handleDynPrefetching(numLocal, ntuples, siteid);
return head;
}
-int checkoid(unsigned int oid) {
+int checkoid(unsigned int oid, int isLastOffset) {
objheader_t *header;
if ((header=mhashSearch(oid))!=NULL) {
//Found on machine
return 1;
} else if ((header=prehashSearch(oid))!=NULL) {
+ //if the last offset then prefetch object
+ if((STATUS(header) & DIRTY) && isLastOffset) {
+ return 0;
+ }
//Found in cache
return 1;
} else {
}
}
-int lookupObject(unsigned int * oid, short offset) {
+int lookupObject(unsigned int * oid, short offset, int *countInvalidObj) {
objheader_t *header;
if ((header=mhashSearch(*oid))!=NULL) {
//Found on machine
;
} else if ((header=prehashSearch(*oid))!=NULL) {
//Found in cache
- ;
+ if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+ //only once because later old entries may still cause unnecessary roundtrips during prefetching
+ (*countInvalidObj)+=1;
+ if(*countInvalidObj > 1) {
+ return 0;
+ }
+ }
} else {
return 0;
}
- if(TYPE(header) > NUMCLASSES) {
+ if(TYPE(header) >= NUMCLASSES) {
int elementsize = classsize[TYPE(header)];
struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
int length = ao->___length___;
void *node=gettail();
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
- prefetchpile_t *pilehead = foundLocal(node);
+ int count=numavailable();
+ prefetchpile_t *pilehead = foundLocal(node, count, 0);
if (pilehead!=NULL) {
// Get sock from shared pool
- int sd = getSock2(transPrefetchSockPool, pilehead->mid);
/* Send Prefetch Request */
prefetchpile_t *ptr = pilehead;
while(ptr != NULL) {
- sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
+ globalid++;
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd,globalid);
+ ptr = ptr->next;
}
/* Release socket */
mcdealloc(pilehead);
}
// Deallocate the prefetch queue pile node
- inctail();
+ incmulttail(count);
}
}
return;
}
-void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
+/**
+ * parameters: mcpilenode -> pile node to traverse to assemble pref requests
+ * sd -> socket id
+ * gid -> global identifier for each prefetch request sent, starts with 0
+ **/
+void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd, int gid) {
int len, endpair;
char control;
objpile_t *tmp;
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
+
/* Send TRANS_PREFETCH control message */
- control = TRANS_PREFETCH;
- send_data(sd, &control, sizeof(char));
+ int first=1;
/* Send Oids and offsets in pairs */
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
- len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
- char oidnoffset[len];
+ len = sizeof(int)+sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
+ char oidnoffset[len+5];
char *buf=oidnoffset;
+ if (first) {
+ *buf=TRANS_PREFETCH;
+ buf++;len++;
+ first=0;
+ }
*((int*)buf) = tmp->numoffset;
buf+=sizeof(int);
*((unsigned int *)buf) = tmp->oid;
+ LOGOIDTYPE("S",tmp->oid,tmp->numoffset,myrdtsc());
+#ifdef TRANSSTATS
+ sendRemoteReq++;
+#endif
buf+=sizeof(unsigned int);
*((unsigned int *)buf) = myIpAddr;
- buf += sizeof(unsigned int);
+ buf+= sizeof(unsigned int);
+ *((int*)buf) = gid;
+ buf+=sizeof(int);
memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
- send_data(sd, oidnoffset, len);
tmp = tmp->next;
+ if (tmp==NULL) {
+ *((int *)(&oidnoffset[len]))=-1;
+ len+=sizeof(int);
+ }
+ if (tmp!=NULL)
+ send_buf(sd, &writebuffer, oidnoffset, len);
+ else
+ forcesend_buf(sd, &writebuffer, oidnoffset, len);
}
-
- /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
- endpair = -1;
- send_data(sd, &endpair, sizeof(int));
-
+ LOGOIDTYPE("SREQ",0,0,myrdtsc());
+ LOGEVENT('S');
+ LOGTIME('S',0,0,myrdtsc(),gid); //after sending
return;
}
-int getPrefetchResponse(int sd) {
- int length = 0, size = 0;
+int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
+ int gid,length = 0, size = 0;
char control;
unsigned int oid;
void *modptr, *oldptr;
- recv_data((int)sd, &length, sizeof(int));
+ recv_data_buf(sd, readbuffer, &length, sizeof(int));
size = length - sizeof(int);
char recvbuffer[size];
-
- recv_data((int)sd, recvbuffer, size);
+#ifdef TRANSSTATS
+ getResponse++;
+ LOGEVENT('Z');
+ LOGTIME('K',0,0, myrdtsc(),0); //log time after first recv
+#endif
+ recv_data_buf(sd, readbuffer, recvbuffer, size);
control = *((char *) recvbuffer);
if(control == OBJECT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
- size = size - (sizeof(char) + sizeof(unsigned int));
+ gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
+ LOGTIME('G',oid,0, myrdtsc(),gid); //log time after first recv
+ size = size - (sizeof(char) + sizeof(unsigned int) + sizeof(int));
pthread_mutex_lock(&prefetchcache_mutex);
if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
return -1;
}
pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+ memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int)+sizeof(int), size);
STATUS(modptr)=0;
+
/* Insert the oid and its address into the prefetch hash lookup table */
/* Do a version comparison if the oid exists */
if((oldptr = prehashSearch(oid)) != NULL) {
/* If older version then update with new object ptr */
- if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
- prehashRemove(oid);
- prehashInsert(oid, modptr);
+ if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
+ prehashInsert(oid, modptr);
}
} else { /* Else add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
- /* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
- /* Broadcast signal on prefetch cache condition variable */
- pthread_cond_broadcast(&pflookup.cond);
- /* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
+ LOGOIDTYPE("GR",oid, TYPE(modptr),myrdtsc());
+ LOGTIME('Z',oid, TYPE(modptr), myrdtsc(),gid); //log time after copying it into the prefetch cache
} else if(control == OBJECT_NOT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ gid = *((int *) (recvbuffer+sizeof(char)+sizeof(unsigned int)));
+ LOGOIDTYPE("NF",oid,0,myrdtsc());
+ LOGTIME('F',oid, 0, myrdtsc(),gid); //log time after copying it into the prefetch cache
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
//printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
int sock;
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned int)];
- int bytesSent;
+ //int bytesSent;
int status;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
struct sockaddr_in remoteAddr;
char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
char *ptr;
- int bytesSent;
+ //int bytesSent;
int status, size;
unsigned short version;
unsigned int oid,mid;
unsigned int mid;
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
- int sock, status, size, bytesSent;
+ int sock, status, size;
+ //int bytesSent;
while(*head != NULL) {
ptr = *head;
return status;
}
-void transAbort(transrecord_t *trans) {
- objstrDelete(trans->cache);
- chashDelete(trans->lookupTable);
- free(trans);
+void transAbort() {
+#ifdef ABORTREADERS
+ removetransactionhash();
+#endif
+ objstrDelete(t_cache);
+ t_chashDelete();
}
/* This function inserts necessary information into