/* Read from prefetch queue */
void *node = gettail();
+ int count = numavailable();
/* Check tuples if they are found locally */
- perMcPrefetchList_t* pilehead = processLocal(node);
+ perMcPrefetchList_t* pilehead = processLocal(node,count);
if (pilehead!=NULL) {
/* Deallocated pilehead */
proPrefetchQDealloc(pilehead);
}
+
// Deallocate the prefetch queue pile node
- inctail();
+ incmulttail(count);
+ //inctail();
}
}
-perMcPrefetchList_t *processLocal(char *ptr) {
- unsigned int oid = *(GET_OID(ptr));
- short numoffset = *(GET_NUM_OFFSETS(ptr));
- short *offsetarray = GET_OFFSETS(ptr);
- int top;
- unsigned int dfsList[numoffset];
- int offstop=numoffset-2;
-
+perMcPrefetchList_t *processLocal(char *ptr, int numprefetches) {
+ int i,j;
/* Initialize */
perMcPrefetchList_t *head = NULL;
- objheader_t * header = searchObj(oid);
- if (header==NULL) {
- //forward prefetch
- int machinenum = lhashSearch(oid);
- insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
- return head;
- }
- dfsList[0]=oid;
- dfsList[1]=0;
-
-
- //Start searching the dfsList
- for(top=0; top>=0;) {
- oid=getNextOid(header, offsetarray, dfsList, top);
- if (oid&1) {
- int oldisField=TYPE(header) < NUMCLASSES;
- top+=2;
- dfsList[top]=oid;
- dfsList[top+1]=0;
- header=searchObj(oid);
- if (header==NULL) {
- //forward prefetch
- int machinenum = lhashSearch(oid);
-
- if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
- insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
- else
- insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
- } else if (top<offstop)
- //okay to continue going down
- continue;
- } else if (oid==2) {
- //send prefetch first
- int objindex=top+2;
- int machinenum = lhashSearch(dfsList[objindex]);
- insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
+ for(j=0;j<numprefetches; j++) {
+ unsigned int oid = *(GET_OID(ptr));
+ short numoffset = *(GET_NUM_OFFSETS(ptr));
+ short *offsetarray = GET_OFFSETS(ptr);
+ int top;
+ unsigned int dfsList[numoffset];
+ int offstop=numoffset-2;
+
+
+ objheader_t * header = searchObj(oid);
+ if (header==NULL) {
+ //forward prefetch
+ int machinenum = lhashSearch(oid);
+ insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
+ return head;
}
- //oid is 0
- //go backwards until we can increment
- do {
+ dfsList[0]=oid;
+ dfsList[1]=0;
+
+
+ //Start searching the dfsList
+ for(top=0; top>=0;) {
+ oid=getNextOid(header, offsetarray, dfsList, top);
+ if (oid&1) {
+ int oldisField=TYPE(header) < NUMCLASSES;
+ top+=2;
+ dfsList[top]=oid;
+ dfsList[top+1]=0;
+ header=searchObj(oid);
+ if (header==NULL) {
+ //forward prefetch
+ int machinenum = lhashSearch(oid);
+
+ if (oldisField&&(dfsList[top-1]!=GET_RANGE(offsetarray[top+1])))
+ insertPrefetch(machinenum, oid, 2+numoffset-top, &offsetarray[top-2], &head);
+ else
+ insertPrefetch(machinenum, oid, numoffset-top, &offsetarray[top], &head);
+ } else if (top<offstop)
+ //okay to continue going down
+ continue;
+ } else if (oid==2) {
+ //send prefetch first
+ int objindex=top+2;
+ int machinenum = lhashSearch(dfsList[objindex]);
+ insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
+ }
+ //oid is 0
+ //go backwards until we can increment
do {
- top-=2;
- if (top<0)
- return head;
- } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
-
- header=searchObj(dfsList[top]);
- //header shouldn't be null unless the object moves away, but allow
- //ourselves the option to just continue on if we lose the object
- } while(header==NULL);
- //increment
- dfsList[top+1]++;
+ do {
+ top-=2;
+ if (top<0)
+ return head;
+ } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
+
+ header=searchObj(dfsList[top]);
+ //header shouldn't be null unless the object moves away, but allow
+ //ourselves the option to just continue on if we lose the object
+ } while(header==NULL);
+ //increment
+ dfsList[top+1]++;
+ }
}
return head;
}
int len, endpair;
char control;
objOffsetPile_t *tmp;
+ struct writestruct writebuffer;
+ writebuffer.offset=0;
+
/* Send TRANS_PREFETCH control message */
- control = TRANS_PREFETCH;
- send_data(sd, &control, sizeof(char));
+ int first=1;
+ //control = TRANS_PREFETCH;
+ //send_data(sd, &control, sizeof(char));
/* Send Oids and offsets in pairs */
tmp = mcpilenode->list;
while(tmp != NULL) {
len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
- char oidnoffset[len];
+ char oidnoffset[len+5];
char *buf=oidnoffset;
+ if (first) {
+ *buf=TRANS_PREFETCH;
+ buf++;len++;
+ first=0;
+ }
*((int*)buf) = tmp->numoffset;
buf+=sizeof(int);
*((unsigned int *)buf) = tmp->oid;
buf+=sizeof(unsigned int);
+#ifdef TRANSSTATS
+ sendRemoteReq++;
+#endif
*((unsigned int *)buf) = mid;
buf += sizeof(unsigned int);
memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
- send_data(sd, oidnoffset, len);
tmp = tmp->next;
+ if(tmp==NULL) {
+ *((int*)(&oidnoffset[len]))=-1;
+ len+=sizeof(int);
+ }
+ if(tmp!=NULL)
+ send_buf(sd, & writebuffer, oidnoffset, len);
+ else
+ forcesend_buf(sd, & writebuffer, oidnoffset, len);
+ //send_data(sd, oidnoffset, len);
+ //tmp = tmp->next;
}
/* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
- endpair = -1;
- send_data(sd, &endpair, sizeof(int));
+ //endpair = -1;
+ //send_data(sd, &endpair, sizeof(int));
return;
}
recv_data_buf(sd, readbuffer, &length, sizeof(int));
int size = length - sizeof(int);
char recvbuffer[size];
+#ifdef TRANSSTATS
+ getResponse++;
+#endif
recv_data_buf(sd, readbuffer, recvbuffer, size);
char control = *((char *) recvbuffer);
unsigned int oid;
int numoffset, sd = -1;
unsigned int baseoid, mid = -1;
oidmidpair_t oidmid;
+ struct writestruct writebuffer;
while (1) {
recv_data_buf(acceptfd, readbuffer, &numoffset, sizeof(int));
recv_data_buf(acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
baseoid = oidmid.oid;
if(mid != oidmid.mid) {
- if(mid!= -1)
- freeSockWithLock(transPResponseSocketPool, mid, sd);
+ if(mid!= -1) {
+ forcesend_buf(sd, &writebuffer, NULL, 0);
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ }
mid = oidmid.mid;
sd = getSockWithLock(transPResponseSocketPool, mid);
+ writebuffer.offset=0;
}
short offsetsarry[numoffset];
recv_data_buf(acceptfd, readbuffer, offsetsarry, numoffset*sizeof(short));
//Release socket
if(mid!=-1)
+ forcesend_buf(sd,&writebuffer, NULL, 0);
freeSockWithLock(transPResponseSocketPool, mid, sd);
return 0;
}
#include <pthread.h>
#endif
#ifdef STMLOG
+#define ARRAY_LENGTH 700003
__thread int counter;
-__thread int event[100000*7+3];
-__thread unsigned long long clkticks[100000*7+3];
+__thread int event[ARRAY_LENGTH];
+__thread unsigned long long clkticks[ARRAY_LENGTH];
+unsigned long long beginClock=0;
#define FILENAME "log"
#endif
return;
}
+void CALL00(___System______logevent____) {
+#ifdef STMLOG
+ beginClock= rdtsc();
+#endif
+ return;
+}
+
void CALL11(___System______flushToFile____I, int ___threadid___, int ___threadid___) {
#ifdef STMLOG
FILE *fp;
}
int i;
for (i = 0; i < counter-1; i++) {
- fprintf(fp, "%d %lld %lld\n", event[i], clkticks[i], clkticks[i+1]);
+ fprintf(fp, "%d %lld %lld\n", event[i], clkticks[i]-beginClock, clkticks[i+1]-beginClock);
}
- fprintf(fp, "%d %lld\n", event[i], clkticks[i]);
+ fprintf(fp, "%d %lld\n", event[i], clkticks[i]-beginClock);
fclose(fp);
#endif
void CALL00(___System______initLog____) {
#ifdef STMLOG
counter=0;
+ int i;
+ for(i=0; i<ARRAY_LENGTH; i++) {
+ event[i] = 0;
+ clkticks[i] = 0;
+ }
+
#endif
return;
}
#endif
}
#endif
+
#endif // DSTM
/* STM Barrier constructs */
__attribute__((malloc)) void * allocate_newglobal(int type) {
struct ___Object___ * v=(struct ___Object___ *) transCreateObj(classsize[type]);
v->type=type;
+ //printf("DEBUG %s(), type= %x\n", __func__, type);
#ifdef THREADS
v->tid=0;
v->lockentry=0;