From f7a594bd00dc33b0d04b8aefa58c13ac7216686a Mon Sep 17 00:00:00 2001 From: bdemsky Date: Sat, 3 May 2008 09:14:50 +0000 Subject: [PATCH] hacks to speed up prefetching...doesn't really help though.. --- Robust/src/Runtime/DSTM/interface/dstm.h | 10 +- .../src/Runtime/DSTM/interface/dstmserver.c | 5 + Robust/src/Runtime/DSTM/interface/mcpileq.c | 7 -- Robust/src/Runtime/DSTM/interface/mcpileq.h | 1 - Robust/src/Runtime/DSTM/interface/queue.c | 119 +++++++++--------- Robust/src/Runtime/DSTM/interface/queue.h | 24 +--- Robust/src/Runtime/DSTM/interface/sockpool.c | 4 +- Robust/src/Runtime/DSTM/interface/trans.c | 40 +++--- 8 files changed, 91 insertions(+), 119 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 82e415df..4b5b9e4b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -8,10 +8,10 @@ /*********************************************************** * Macros **********************************************************/ -#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t))) -#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int))) -#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)))) -#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) +#define GET_NTUPLES(x) ((int *)(x)) +#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(int))) +#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(int) + (n*sizeof(unsigned int)))) +#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short)))) /***************************************** * Coordinator Messages ***************************************/ @@ -268,7 +268,7 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size); void prefetch(int, unsigned int *, unsigned short *, short*); void *transPrefetch(void *); void *mcqProcess(void *); -prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets) +prefetchpile_t *foundLocal(char *);// returns node with prefetch elements(oids, offsets) int lookupObject(unsigned int * oid, short offset); int transPrefetchProcess(transrecord_t *, int **, short); void sendPrefetchReq(prefetchpile_t*, int); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index ddacde97..b5bcac5b 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -1,6 +1,7 @@ /* Coordinator => Machine that initiates the transaction request call for commiting a transaction * Participant => Machines that host the objects involved in a transaction commit */ +#include #include "dstm.h" #include "mlookup.h" #include "llookup.h" @@ -103,7 +104,9 @@ void *dstmListen() while(1) { int retval; + int flag=1; acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength); + setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); do { retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd); } while(retval!=0); @@ -127,6 +130,8 @@ void *dstmAccept(void *acceptfd) { /* Receive control messages from other machines */ while(1) { int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char)); + if (ret==0) + return; if (ret==-1) { printf("DEBUG -> RECV Error!.. retrying\n"); break; diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index a8e5d81f..bb58ecef 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -38,13 +38,6 @@ prefetchpile_t *mcpiledequeue(void) { return retnode; } -void mcpiledelete(void) { - /* Remove each element */ - while(mcqueue.front != NULL) - delqnode(); -} - - void mcpiledisplay() { int mid; diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index 5c4046c9..f32c7b69 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -31,7 +31,6 @@ typedef struct mcpileq { void mcpileqInit(void); void mcpileenqueue(prefetchpile_t *, prefetchpile_t *); prefetchpile_t *mcpiledequeue(void); -void mcpiledelete(); void mcpiledisplay(); void mcdealloc(prefetchpile_t *); diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 1056959b..51d586c5 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -1,81 +1,80 @@ #include "queue.h" -primarypfq_t pqueue; //Global queue +volatile int headoffset, tailoffset; +char * memory; +pthread_mutex_t qlock; +pthread_mutexattr_t qlockattr; +pthread_cond_t qcond; + + +#define QSIZE 1000000 //1 MB void queueInit(void) { /* Intitialize primary queue */ - pqueue.front = pqueue.rear = NULL; - pthread_mutexattr_init(&pqueue.qlockattr); - pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); - pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr); - pthread_cond_init(&pqueue.qcond, NULL); + headoffset=0; + tailoffset=0; + memory=malloc(QSIZE); + pthread_mutexattr_init(&qlockattr); + pthread_mutexattr_settype(&qlockattr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&qlock, &qlockattr); + pthread_cond_init(&qcond, NULL); } -/* Delete the node pointed to by the front ptr of the queue */ -void delqnode() { - prefetchqelem_t *delnode; - if(pqueue.front == NULL) { - printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__); - return; - } else if (pqueue.front == pqueue.rear) { - free(pqueue.front); - pqueue.front = pqueue.rear = NULL; +void * getmemory(int size) { + int tmpoffset=headoffset+size+sizeof(int); + if (tmpoffset>QSIZE) { + //Wait for tail to go past end + tmpoffset=size+sizeof(int); + while(headoffsetnext; - free(delnode); + while(headoffsetQSIZE) { + headoffset=size+sizeof(int); + } else + headoffset=tmpoffset; + pthread_cond_signal(&qcond);//wake the other thread up } -/* Inserts to the rear of primary prefetch queue */ -void pre_enqueue(prefetchqelem_t *qnode) { - if(pqueue.front == NULL) { - pqueue.front = pqueue.rear = qnode; - qnode->next=NULL; - } else { - qnode->next = NULL; - pqueue.rear->next = qnode; - pqueue.rear = qnode; +void * gettail() { + while(tailoffset==headoffset) { + //Sleep + pthread_mutex_lock(&qlock); + if (tailoffset==headoffset) + pthread_cond_wait(&qcond, &qlock); + pthread_mutex_unlock(&qlock); } -} - -/* Return the node pointed to by the front ptr of the queue */ -prefetchqelem_t *pre_dequeue(void) { - prefetchqelem_t *retnode; - if (pqueue.front == NULL) { - printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__); - return NULL; + if (*((int *)(memory+tailoffset))==-1) { + tailoffset=0;//do loop } - retnode = pqueue.front; - pqueue.front = pqueue.front->next; - if (pqueue.front == NULL) - pqueue.rear = NULL; - retnode->next = NULL; - - return retnode; + + return memory+tailoffset+sizeof(int); } -void queueDisplay() { - int offset = sizeof(prefetchqelem_t); - int *ptr; - int ntuples; - char *ptr1; - prefetchqelem_t *tmp = pqueue.front; - while(tmp != NULL) { - ptr1 = (char *) tmp; - ptr = (int *)(ptr1 + offset); - ntuples = *ptr; - tmp = tmp->next; - } +void inctail() { + int tmpoffset=tailoffset+*((int *)(memory+tailoffset)); + if (tmpoffset>QSIZE) + tailoffset=0; + else + tailoffset=tmpoffset; } -void predealloc(prefetchqelem_t *node) { - free(node); + +void predealloc() { + free(memory); } diff --git a/Robust/src/Runtime/DSTM/interface/queue.h b/Robust/src/Runtime/DSTM/interface/queue.h index d315135b..d8a751fd 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.h +++ b/Robust/src/Runtime/DSTM/interface/queue.h @@ -6,24 +6,10 @@ #include #include -// DS that contains information to be shared between threads. -typedef struct prefetchqelem { - struct prefetchqelem *next; -} prefetchqelem_t; - -typedef struct primarypfq { - prefetchqelem_t *front, *rear; - pthread_mutex_t qlock; - pthread_mutexattr_t qlockattr; - pthread_cond_t qcond; -} primarypfq_t; - - void queueInit(void); -void delqnode(); -void queueDelete(void); -void pre_enqueue(prefetchqelem_t *); -prefetchqelem_t *pre_dequeue(void); -void queueDisplay(); -void predealloc(prefetchqelem_t *); +void * getmemory(int size); +void movehead(int size); +void * gettail(); +void inctail(); +void predealloc(); #endif diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.c b/Robust/src/Runtime/DSTM/interface/sockpool.c index 64337b25..e45cdc55 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface/sockpool.c @@ -1,5 +1,5 @@ #include "sockpool.h" - +#include #if defined(__i386__) inline static int test_and_set(volatile unsigned int *addr) { @@ -59,10 +59,12 @@ sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int int createNewSocket(unsigned int mid) { int sd; + int flag=1; if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__); return -1; } + setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag)); struct sockaddr_in remoteAddr; bzero(&remoteAddr, sizeof(remoteAddr)); remoteAddr.sin_family = AF_INET; diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 4141f222..0fdd0443 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -18,7 +18,6 @@ /* Global Variables */ extern int classsize[]; -extern primarypfq_t pqueue; //Shared prefetch queue objstr_t *prefetchcache; //Global Prefetch cache pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */ @@ -83,13 +82,15 @@ int recv_data_errorcode(int fd , void *buf, int buflen) { int numbytes; while (size > 0) { numbytes = recv(fd, buffer, size, 0); + if (numbytes==0) + return 0; if (numbytes == -1) { return -1; } buffer += numbytes; size -= numbytes; } - return 0; + return 1; } void printhex(unsigned char *ptr, int numBytes) { @@ -126,22 +127,19 @@ inline int findmax(int *array, int arraylength) { * populates the shared primary prefetch queue*/ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { /* Allocate for the queue node*/ - int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); - char * node= malloc(qnodesize); + int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); + char * node= getmemory(qnodesize); /* Set queue node values */ - int len = sizeof(prefetchqelem_t); + int len; int top=endoffsets[ntuples-1]; - *((int *)(node+len))=ntuples; - len += sizeof(int); + *((int *)(node))=ntuples; + len = sizeof(int); memcpy(node+len, oids, ntuples*sizeof(unsigned int)); memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short)); memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short)); /* Lock and insert into primary prefetch queue */ - pthread_mutex_lock(&pqueue.qlock); - pre_enqueue((prefetchqelem_t *)node); - pthread_cond_signal(&pqueue.qcond); - pthread_mutex_unlock(&pqueue.qlock); + movehead(qnodesize); } /* This function starts up the transaction runtime. */ @@ -1019,8 +1017,7 @@ int transComProcess(local_thread_data_array_t *localtdata) { return 0; } -prefetchpile_t *foundLocal(prefetchqelem_t *node) { - char * ptr = (char *) node; +prefetchpile_t *foundLocal(char *ptr) { int ntuples = *(GET_NTUPLES(ptr)); unsigned int * oidarray = GET_PTR_OID(ptr); unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); @@ -1104,19 +1101,10 @@ int lookupObject(unsigned int * oid, short offset) { void *transPrefetch(void *t) { while(1) { /* lock mutex of primary prefetch queue */ - pthread_mutex_lock(&pqueue.qlock); - /* while primary queue is empty, then wait */ - while(pqueue.front == NULL) { - pthread_cond_wait(&pqueue.qcond, &pqueue.qlock); - } - - /* dequeue node to create a machine piles and finally unlock mutex */ - prefetchqelem_t *qnode = pre_dequeue(); - pthread_mutex_unlock(&pqueue.qlock); - + void *node=gettail(); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ - prefetchpile_t *pilehead = foundLocal(qnode); + prefetchpile_t *pilehead = foundLocal(node); if (pilehead!=NULL) { // Get sock from shared pool @@ -1134,10 +1122,9 @@ void *transPrefetch(void *t) { /* Deallocated pilehead */ mcdealloc(pilehead); - } // Deallocate the prefetch queue pile node - predealloc(qnode); + inctail(); } } @@ -1217,6 +1204,7 @@ int getPrefetchResponse(int sd) { control = *((char *) recvbuffer); if(control == OBJECT_FOUND) { oid = *((unsigned int *)(recvbuffer + sizeof(char))); + //printf("oid %d found\n",oid); size = size - (sizeof(char) + sizeof(unsigned int)); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { -- 2.34.1