#include "thread.h"
#endif
#include "gCollect.h"
+#include "readstruct.h"
#define BACKLOG 10 //max pending connections
#define RECEIVE_BUFFER_SIZE 2048
trans_commit_data_t transinfo;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
+ struct readstruct readbuffer;
+ readbuffer.head=0;
+ readbuffer.tail=0;
/* Receive control messages from other machines */
while(1) {
- int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+ int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
if (ret==0)
break;
if (ret==-1) {
switch(control) {
case READ_REQUEST:
/* Read oid requested and search if available */
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
while((srcObj = mhashSearch(oid)) == NULL) {
int ret;
if((ret = sched_yield()) != 0) {
transinfo.modptr = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
- if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
+ if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) {
printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
case TRANS_PREFETCH:
#ifdef RANGEPREFETCH
- if((val = rangePrefetchReq((int)acceptfd)) != 0) {
+ if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) {
printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
break;
}
#else
- if((val = prefetchReq((int)acceptfd)) != 0) {
+ if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
break;
}
case TRANS_PREFETCH_RESPONSE:
#ifdef RANGEPREFETCH
- if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
+ if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) {
printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
break;
}
#else
- if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+ if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) {
printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
break;
}
break;
case START_REMOTE_THREAD:
- recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
objType = getObjType(oid);
startDSMthread(oid, objType);
break;
case THREAD_NOTIFY_REQUEST:
- recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int));
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
pthread_exit(NULL);
}
- recv_data((int)acceptfd, buffer, size);
+ recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
oidarry = calloc(numoid, sizeof(unsigned int));
memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
threadid = *((unsigned int *)(buffer+size));
processReqNotify(numoid, oidarry, versionarry, mid, threadid);
free(buffer);
-
break;
case THREAD_NOTIFY_RESPONSE:
pthread_exit(NULL);
}
- recv_data((int)acceptfd, buffer, size);
+ recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
/* This function reads the information available in a transaction request
* and makes a function call to process the request */
-int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
+int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) {
char *ptr;
void *modptr;
unsigned int *oidmod, oid;
size = sizeof(fixed) - 1;
ptr = (char *)&fixed;;
fixed.control = TRANS_REQUEST;
- recv_data((int)acceptfd, ptr+1, size);
+ recv_data_buf((int)acceptfd, readbuffer, ptr+1, size);
/* Read list of mids */
int mcount = fixed.mcount;
size = mcount * sizeof(unsigned int);
unsigned int listmid[mcount];
ptr = (char *) listmid;
- recv_data((int)acceptfd, ptr, size);
+ recv_data_buf((int)acceptfd, readbuffer, ptr, size);
/* Read oid and version tuples for those objects that are not modified in the transaction */
int numread = fixed.numread;
char objread[size];
if(numread != 0) { //If pile contains more than one object to be read,
// keep reading all objects
- recv_data((int)acceptfd, objread, size);
+ recv_data_buf((int)acceptfd, readbuffer, objread, size);
}
/* Read modified objects */
return 1;
}
size = fixed.sum_bytes;
- recv_data((int)acceptfd, modptr, size);
+ recv_data_buf((int)acceptfd, readbuffer, modptr, size);
}
/* Create an array of oids for modified objects */
}
/*Process the information read */
- if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
+ if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) {
printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
/* Free resources */
if(oidmod != NULL) {
* function and sends a reply to the co-ordinator.
* Following this it also receives a new control message from the co-ordinator and processes this message*/
int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
- unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
+ unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) {
char control, sendctrl, retval;
objheader_t *tmp_header;
return 1;
}
- recv_data((int)acceptfd, &control, sizeof(char));
+ recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char));
/* Process the new control message */
switch(control) {
case TRANS_ABORT:
* If objects are not found then record those and if objects are found
* then use offset values to prefetch references to other objects */
-int prefetchReq(int acceptfd) {
+int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
int i, size, objsize, numoffset = 0;
int length;
char *recvbuffer, control;
int sd = -1;
while(1) {
- recv_data((int)acceptfd, &numoffset, sizeof(int));
+ recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
if(numoffset == -1)
break;
- recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+ recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
oid = oidmid.oid;
if (mid != oidmid.mid) {
if (mid!=-1) {
sd = getSockWithLock(transPResponseSocketPool, mid);
}
short offsetarry[numoffset];
- recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
+ recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found in buffer for later use */
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- char sendbuffer[size];
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+sizeof(char))) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+ send_data(sd, sendbuffer, size+1);
} else { /* Object Found */
- int incr = 0;
+ int incr = 1;
GETSIZE(objsize, header);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- char sendbuffer[size];
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *)(sendbuffer + incr)) = size;
incr += sizeof(int);
*((char *)(sendbuffer + incr)) = OBJECT_FOUND;
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ send_data(sd, sendbuffer, size+1);
/* Calculate the oid corresponding to the offset value */
for(i = 0 ; i< numoffset ; i++) {
if((header = mhashSearch(oid)) == NULL) {
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- char sendbuffer[size];
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+1)) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ send_data(sd, sendbuffer, size+1);
break;
} else { /* Obj Found */
- int incr = 0;
+ int incr = 1;
GETSIZE(objsize, header);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- char sendbuffer[size];
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
*((int *)(sendbuffer + incr)) = size;
incr += sizeof(int);
*((char *)(sendbuffer + incr)) = OBJECT_FOUND;
*((unsigned int *)(sendbuffer+incr)) = oid;
incr += sizeof(unsigned int);
memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- sendPrefetchResponse(sd, &control, sendbuffer, &size);
+ send_data(sd, sendbuffer, size+1);
}
} //end of for
}
plistnode_t *createPiles();
plistnode_t *sortPiles(plistnode_t *pileptr);
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+ int tmp=bigindex++; \
+ bigarray[tmp]=x; \
+ }
+
+
/*******************************
* Send and Recv function calls
*******************************/
}
}
+void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+ char *buf=(char *)buffer;
+
+ int numbytes=readbuffer->head-readbuffer->tail;
+ if (numbytes>buflen)
+ numbytes=buflen;
+ if (numbytes>0) {
+ memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+ readbuffer->tail+=numbytes;
+ buflen-=numbytes;
+ buf+=numbytes;
+ if (buflen==0) {
+ return;
+ }
+ }
+
+ if (buflen>=MAXBUF) {
+ recv_data(fd, buf, buflen);
+ return;
+ }
+
+ int maxbuf=MAXBUF;
+ int obufflen=buflen;
+ readbuffer->head=0;
+
+ while (buflen > 0) {
+ int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+ if (numbytes == -1) {
+ perror("recv");
+ exit(0);
+ }
+ buflen-=numbytes;
+ readbuffer->head+=numbytes;
+ maxbuf-=numbytes;
+ }
+ memcpy(buf,readbuffer->buf,obufflen);
+ readbuffer->tail=obufflen;
+}
+
+int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+ char *buf=(char *)buffer;
+ //now tail<=head
+ int numbytes=readbuffer->head-readbuffer->tail;
+ if (numbytes>buflen)
+ numbytes=buflen;
+ if (numbytes>0) {
+ memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+ readbuffer->tail+=numbytes;
+ buflen-=numbytes;
+ buf+=numbytes;
+ if (buflen==0)
+ return 1;
+ }
+
+ if (buflen>=MAXBUF) {
+ return recv_data_errorcode(fd, buf, buflen);
+ }
+
+ int maxbuf=MAXBUF;
+ int obufflen=buflen;
+ readbuffer->head=0;
+
+ while (buflen > 0) {
+ int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+ if (numbytes ==0) {
+ return 0;
+ }
+ if (numbytes==-1) {
+ perror("recvbuf");
+ return -1;
+ }
+ buflen-=numbytes;
+ readbuffer->head+=numbytes;
+ maxbuf-=numbytes;
+ }
+ memcpy(buf,readbuffer->buf,obufflen);
+ readbuffer->tail=obufflen;
+ return 1;
+}
+
+
void recv_data(int fd, void *buf, int buflen) {
char *buffer = (char *)(buf);
int size = buflen;
/* Allocate for the queue node*/
int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
int len;
- char * node= getmemory(qnodesize);
+#ifdef INLINEPREFETCH
+ char node[qnodesize];
+#else
+ char *node=getmemory(qnodesize);
+#endif
int top=endoffsets[ntuples-1];
- if (node==NULL)
+ if (node==NULL) {
+ LOGEVENT('D');
return;
+ }
/* Set queue node values */
/* TODO: Remove this after testing */
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
+#ifdef INLINEPREFETCH
+ prefetchpile_t *pilehead = foundLocal(node);
+
+ if (pilehead!=NULL) {
+ // Get sock from shared pool
+
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ int sd = getSock2(transPrefetchSockPool, ptr->mid);
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
+
+ /* Release socket */
+ // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+
+ /* Deallocated pilehead */
+ mcdealloc(pilehead);
+ }
+#else
/* Lock and insert into primary prefetch queue */
movehead(qnodesize);
+#endif
}
/* This function starts up the transaction runtime. */
retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
} while(retval!=0);
#else
+#ifndef INLINEPREFETCH
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
+#endif
#endif
pthread_detach(tPrefetch);
#endif
#ifdef CACHE
if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
#ifdef TRANSSTATS
+ LOGEVENT('P')
nprehashSearch++;
#endif
/* Look up in prefetch cache */
return NULL;
} else {
#ifdef TRANSSTATS
+
+ LOGEVENT('R');
nRemoteSend++;
#endif
#ifdef COMPILER
trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
char finalResponse;
+#ifdef TRANSSTATS
+ int iii;
+ for(iii=0;iii<bigindex;iii++) {
+ printf("%c", bigarray[iii]);
+ }
+#endif
+
#ifdef ABORTREADERS
if (t_abort) {
//abort this transaction
unsigned int oid=oidarray[i];
int newbase;
int machinenum;
+
if (oid==0)
continue;
//Look up fields locally
objpile_t *tmp;
/* Send TRANS_PREFETCH control message */
- control = TRANS_PREFETCH;
- send_data(sd, &control, sizeof(char));
+ int first=1;
/* Send Oids and offsets in pairs */
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
- char oidnoffset[len];
+ char oidnoffset[len+5];
char *buf=oidnoffset;
+ if (first) {
+ *buf=TRANS_PREFETCH;
+ buf++;len++;
+ first=0;
+ }
*((int*)buf) = tmp->numoffset;
buf+=sizeof(int);
*((unsigned int *)buf) = tmp->oid;
*((unsigned int *)buf) = myIpAddr;
buf += sizeof(unsigned int);
memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
- send_data(sd, oidnoffset, len);
tmp = tmp->next;
+ if (tmp==NULL) {
+ *((int *)(&oidnoffset[len]))=-1;
+ len+=sizeof(int);
+ }
+ send_data(sd, oidnoffset, len);
}
- /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
- endpair = -1;
- send_data(sd, &endpair, sizeof(int));
-
+ LOGEVENT('S');
return;
}
-int getPrefetchResponse(int sd) {
+int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
int length = 0, size = 0;
char control;
unsigned int oid;
void *modptr, *oldptr;
- recv_data((int)sd, &length, sizeof(int));
+ recv_data_buf(sd, readbuffer, &length, sizeof(int));
size = length - sizeof(int);
char recvbuffer[size];
#ifdef TRANSSTATS
getResponse++;
+ LOGEVENT('Z');
#endif
- recv_data((int)sd, recvbuffer, size);
+ recv_data_buf(sd, readbuffer, recvbuffer, size);
control = *((char *) recvbuffer);
if(control == OBJECT_FOUND) {
oid = *((unsigned int *)(recvbuffer + sizeof(char)));