char bigarray[16*1024*1024];
int bigindex=0;
#define LOGEVENT(x) { \
- int tmp=bigindex++; \
- bigarray[tmp]=x; \
- }
+ int tmp=bigindex++; \
+ bigarray[tmp]=x; \
+}
#else
#define LOGEVENT(x)
#endif
long long bigarray4[6*1024*1024];
int bigarray5[6*1024*1024];
int bigindex1=0;
-#define LOGTIME(x,y,z,a,b) {\
- int tmp=bigindex1; \
- bigarray1[tmp]=x; \
- bigarray2[tmp]=y; \
- bigarray3[tmp]=z; \
- bigarray4[tmp]=a; \
- bigarray5[tmp]=b; \
- bigindex1++; \
+#define LOGTIME(x,y,z,a,b) { \
+ int tmp=bigindex1; \
+ bigarray1[tmp]=x; \
+ bigarray2[tmp]=y; \
+ bigarray3[tmp]=z; \
+ bigarray4[tmp]=a; \
+ bigarray5[tmp]=b; \
+ bigindex1++; \
}
#else
#define LOGTIME(x,y,z,a,b)
int recvw(int fd, void *buf, int len, int flags) {
return recv(fd, buf, len, flags);
}
-
+
void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
char *buf=(char *)buffer;
int numbytes=readbuffer->head-readbuffer->tail;
recv_data(fd, buf, buflen);
return;
}
-
+
int maxbuf=MAXBUF;
int obufflen=buflen;
readbuffer->head=0;
-
+
while (buflen > 0) {
int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
if (numbytes == -1) {
int maxbuf=MAXBUF;
int obufflen=buflen;
readbuffer->head=0;
-
+
while (buflen > 0) {
int numbytes = recvw(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
if (numbytes ==0) {
inline int arrayLength(int *array) {
int i;
- for(i=0 ; array[i] != -1; i++)
+ for(i=0; array[i] != -1; i++)
;
return i;
}
int attempted=0;
char *node;
do {
- node=getmemory(qnodesize);
- if (node==NULL&&attempted)
- break;
- if (node!=NULL) {
+ node=getmemory(qnodesize);
+ if (node==NULL&&attempted)
+ break;
+ if (node!=NULL) {
#else
char *node=getmemory(qnodesize);
#endif
- int top=endoffsets[ntuples-1];
+ int top=endoffsets[ntuples-1];
- if (node==NULL) {
- LOGEVENT('D');
- return;
- }
- /* Set queue node values */
+ if (node==NULL) {
+ LOGEVENT('D');
+ return;
+ }
+ /* Set queue node values */
- /* TODO: Remove this after testing */
- evalPrefetch[siteid].callcount++;
+ /* TODO: Remove this after testing */
+ evalPrefetch[siteid].callcount++;
- *((int *)(node))=siteid;
- *((int *)(node + sizeof(int))) = ntuples;
- len = 2*sizeof(int);
- memcpy(node+len, oids, ntuples*sizeof(unsigned int));
- memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
- memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
+ *((int *)(node))=siteid;
+ *((int *)(node + sizeof(int))) = ntuples;
+ len = 2*sizeof(int);
+ memcpy(node+len, oids, ntuples*sizeof(unsigned int));
+ memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
+ memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
#ifdef INLINEPREFETCH
- movehead(qnodesize);
- }
- int numpref=numavailable();
- attempted=1;
+ movehead(qnodesize);
+ }
+ int numpref=numavailable();
+ attempted=1;
+
+ if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
+ node=gettail();
+ prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ globalid++;
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd, globalid);
+ ptr = ptr->next;
+ }
- if (node==NULL && numpref!=0 || numpref>=PREFTHRESHOLD) {
- node=gettail();
- prefetchpile_t *pilehead = foundLocal(node,numpref,siteid);
- if (pilehead!=NULL) {
- // Get sock from shared pool
-
- /* Send Prefetch Request */
- prefetchpile_t *ptr = pilehead;
- while(ptr != NULL) {
- globalid++;
- int sd = getSock2(transPrefetchSockPool, ptr->mid);
- sendPrefetchReq(ptr, sd, globalid);
- ptr = ptr->next;
+ mcdealloc(pilehead);
}
-
- mcdealloc(pilehead);
- }
- resetqueue();
- }//end do prefetch if condition
+ resetqueue();
+ } //end do prefetch if condition
} while(node==NULL);
#else
- /* Lock and insert into primary prefetch queue */
- movehead(qnodesize);
+ /* Lock and insert into primary prefetch queue */
+ movehead(qnodesize);
#endif
}
int udpfd;
if (processConfigFile() != 0)
- return 0; //TODO: return error value, cause main program to exit
+ return 0; //TODO: return error value, cause main program to exit
#ifdef COMPILER
if (!master)
threadcount--;
#endif
}
-// Search for an address for a given oid
+// Search for an address for a given oid
/*#define INLINE inline __attribute__((always_inline))
-INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
- //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE
- chashlistnode_t *node = &table->table[(key & table->mask)>>1];
+ INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
+ //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE
+ chashlistnode_t *node = &table->table[(key & table->mask)>>1];
- do {
+ do {
if(node->key == key) {
return node->val;
}
node = node->next;
- } while(node != NULL);
+ } while(node != NULL);
- return NULL;
- }*/
+ return NULL;
+ }*/
if(oid == 0) {
return NULL;
}
-
+
node= &c_table[(oid & c_mask)>>1];
do {
if(node->key == oid) {
#ifdef TRANSSTATS
- nchashSearch++;
+ nchashSearch++;
#endif
#ifdef COMPILER
- return &((objheader_t*)node->val)[1];
+ return &((objheader_t*)node->val)[1];
#else
- return node->val;
+ return node->val;
#endif
}
node = node->next;
} while(node != NULL);
-
- /*
- if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
-#ifdef TRANSSTATS
- nchashSearch++;
-#endif
-#ifdef COMPILER
- return &objheader[1];
-#else
- return objheader;
-#endif
- } else
- */
+
+ /*
+ if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
+ #ifdef TRANSSTATS
+ nchashSearch++;
+ #endif
+ #ifdef COMPILER
+ return &objheader[1];
+ #else
+ return objheader;
+ #endif
+ } else
+ */
#ifdef ABORTREADERS
if (t_abort) {
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
if(STATUS(tmp) & DIRTY) {
#ifdef TRANSSTATS
- ndirtyCacheObj++;
+ ndirtyCacheObj++;
#endif
- goto remoteread;
+ goto remoteread;
}
#ifdef TRANSSTATS
nprehashSearch++;
int size;
GETSIZE(size, objcopy);
if((headerObj = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
- printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
- __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- return NULL;
+ printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+ __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return NULL;
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(headerObj, objcopy, size+sizeof(objheader_t));
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
if(STATUS(tmp) & DIRTY) {
#ifdef TRANSSTATS
- ndirtyCacheObj++;
+ ndirtyCacheObj++;
#endif
- goto remoteread;
+ goto remoteread;
}
#ifdef TRANSSTATS
LOGEVENT('P')
}
objcopy = getRemoteObj(machinenumber, oid);
#ifdef TRANSSTATS
- LOGEVENT('R');
- nRemoteSend++;
+ LOGEVENT('R');
+ nRemoteSend++;
#endif
if(objcopy == NULL) {
int size;
GETSIZE(size, objcopy);
if((headerObj = prefetchobjstrAlloc(size+sizeof(objheader_t))) == NULL) {
- printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
- __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- return NULL;
+ printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
+ __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return NULL;
}
pthread_mutex_unlock(&prefetchcache_mutex);
memcpy(headerObj, objcopy, size+sizeof(objheader_t));
/* Represents number of bins in the chash table */
unsigned int size = c_size;
- for(i = 0; i < size ; i++) {
+ for(i = 0; i < size; i++) {
chashlistnode_t * curr = &ptr[i];
/* Inner loop to traverse the linked list of the cache lookupTable */
while(curr != NULL) {
/* Represents number of bins in the chash table */
unsigned int size = c_size;
- for(i = 0; i < size ; i++) {
- struct chashentry * curr = & ptr[i];
+ for(i = 0; i < size; i++) {
+ struct chashentry * curr = &ptr[i];
/* Inner loop to traverse the linked list of the cache lookupTable */
//if the first bin in hash table is empty
if(curr->key == 0)
#ifdef LOGEVENTS
int iii;
- for(iii=0;iii<bigindex;iii++) {
+ for(iii=0; iii<bigindex; iii++) {
printf("%c", bigarray[iii]);
}
#endif
int treplyretryCount = 0;
/* Initialize timeout for exponential delay */
exponential_backoff.tv_sec = 0;
- exponential_backoff.tv_nsec = (long)(10000);//10 microsec
+ exponential_backoff.tv_nsec = (long)(10000); //10 microsec
count_exponential_backoff = 0;
do {
treplyretry = 0;
int socklist[pilecount];
char getReplyCtrl[pilecount];
int loopcount;
- for(loopcount = 0 ; loopcount < pilecount; loopcount++){
+ for(loopcount = 0; loopcount < pilecount; loopcount++) {
socklist[loopcount] = 0;
getReplyCtrl[loopcount] = 0;
}
}
int offset = 0;
int i;
- for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
+ for(i = 0; i < tosend[sockindex].f.nummod; i++) {
int size;
objheader_t *headeraddr;
if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
free(tosend);
return 1;
}
- GETSIZE(size,headeraddr);
+ GETSIZE(size,headeraddr);
size+=sizeof(objheader_t);
memcpy(modptr+offset, headeraddr, size);
offset+=size;
pile = pile->next;
} //end of pile processing
- /* Recv Ctrl msgs from all machines */
+ /* Recv Ctrl msgs from all machines */
int i;
for(i = 0; i < pilecount; i++) {
int sd = socklist[i];
GETSIZE(size, header);
size += sizeof(objheader_t);
//make an entry in prefetch hash table
- prehashInsert(oidToPrefetch, header);
- LOGEVENT('E');
+ prehashInsert(oidToPrefetch, header);
+ LOGEVENT('E');
length = length - size;
offset += size;
}
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
treplyretryCount++;
- // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
- // exponentialdelay();
- // else
+ // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+ // exponentialdelay();
+ // else
randomdelay();
#ifdef TRANSSTATS
nSoftAbort++;
objstrDelete(t_cache);
t_chashDelete();
#ifdef SANDBOX
- abortenabled=1;
+ abortenabled=1;
#endif
return TRANS_ABORT;
} else if(finalResponse == TRANS_COMMIT) {
transinfo->modptr = NULL;
transinfo->numlocked = numoidlocked;
transinfo->numnotfound = numoidnotfound;
-
+
/* Condition to send TRANS_AGREE */
if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
*getReplyCtrl = TRANS_AGREE;
char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
- for (i = 0 ; i < pilecount; i++) {
+ for (i = 0; i < pilecount; i++) {
char control;
control = getReplyCtrl[i];
switch(control) {
default:
printf("Participant sent unknown message %d in %s, %d\n", control, __FILE__, __LINE__);
- /* treat as disagree, pass thru */
+ /* treat as disagree, pass thru */
case TRANS_DISAGREE:
transdisagree++;
break;
int j;
prefetchpile_t * head=NULL;
- for(j=0;j<numprefetches;j++) {
+ for(j=0; j<numprefetches; j++) {
int siteid = *(GET_SITEID(ptr));
int ntuples = *(GET_NTUPLES(ptr));
unsigned int * oidarray = GET_PTR_OID(ptr);
unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
int numLocal = 0;
-
+
for(i=0; i<ntuples; i++) {
unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
unsigned short endindex=endoffsets[i];
//Look up fields locally
int isLastOffset=0;
if(endindex==0)
- isLastOffset=1;
+ isLastOffset=1;
for(newbase=baseindex; newbase<endindex; newbase++) {
- if(newbase==(endindex-1))
- isLastOffset=1;
+ if(newbase==(endindex-1))
+ isLastOffset=1;
if (!lookupObject(&oid,arryfields[newbase],&countInvalidObj)) {
break;
}
//Add to remote requests
machinenum=lhashSearch(oid);
insertPile(machinenum, oid, siteid,endindex-newbase, &arryfields[newbase], &head);
- tuple:
+tuple:
;
}
-
+
/* handle dynamic prefetching */
handleDynPrefetching(numLocal, ntuples, siteid);
ptr=((char *)&arryfields[endoffsets[ntuples-1]])+sizeof(int);
;
} else if ((header=prehashSearch(*oid))!=NULL) {
//Found in cache
- if(STATUS(header) & DIRTY) {//Read an oid that is an old entry in the cache;
+ if(STATUS(header) & DIRTY) { //Read an oid that is an old entry in the cache;
//only once because later old entries may still cause unnecessary roundtrips during prefetching
(*countInvalidObj)+=1;
if(*countInvalidObj > 1) {
- return 0;
+ return 0;
}
}
} else {
/* Send Prefetch Request */
prefetchpile_t *ptr = pilehead;
while(ptr != NULL) {
- globalid++;
- int sd = getSock2(transPrefetchSockPool, ptr->mid);
- sendPrefetchReq(ptr, sd,globalid);
- ptr = ptr->next;
+ globalid++;
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd,globalid);
+ ptr = ptr->next;
}
/* Release socket */
char *buf=oidnoffset;
if (first) {
*buf=TRANS_PREFETCH;
- buf++;len++;
+ buf++; len++;
first=0;
}
*((int*)buf) = tmp->numoffset;
if((oldptr = prehashSearch(oid)) != NULL) {
/* If older version then update with new object ptr */
if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
- prehashInsert(oid, modptr);
+ prehashInsert(oid, modptr);
}
} else { /* Else add the object ptr to hash table*/
prehashInsert(oid, modptr);
/* Send array of oids */
size = sizeof(unsigned int);
- for(i = 0;i < numoid; i++) {
+ for(i = 0; i < numoid; i++) {
oid = oidarry[i];
*((unsigned int *)(&msg[1] + size)) = oid;
size += sizeof(unsigned int);
}
/* Send array of version */
- for(i = 0;i < numoid; i++) {
+ for(i = 0; i < numoid; i++) {
version = versionarry[i];
*((unsigned short *)(&msg[1] + size)) = version;
size += sizeof(unsigned short);
// relocate the position of myIp pile to end of list
plistnode_t *sortPiles(plistnode_t *pileptr) {
- plistnode_t *ptr, *tail;
- tail = pileptr;
+ plistnode_t *ptr, *tail;
+ tail = pileptr;
ptr = NULL;
- /* Get tail pointer and myIp pile ptr */
+ /* Get tail pointer and myIp pile ptr */
if(pileptr == NULL)
return pileptr;
- while(tail->next != NULL) {
+ while(tail->next != NULL) {
if(tail->mid == myIpAddr)
ptr = tail;
- tail = tail->next;
- }
+ tail = tail->next;
+ }
// if ptr is null, then myIp pile is already at tail
if(ptr != NULL) {
- /* Arrange local machine processing at the end of the pile list */
+ /* Arrange local machine processing at the end of the pile list */
tail->next = pileptr;
pileptr = ptr->next;
ptr->next = NULL;