hacks to speed up prefetching...doesn't really help though..
authorbdemsky <bdemsky>
Sat, 3 May 2008 09:14:50 +0000 (09:14 +0000)
committerbdemsky <bdemsky>
Sat, 3 May 2008 09:14:50 +0000 (09:14 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/queue.h
Robust/src/Runtime/DSTM/interface/sockpool.c
Robust/src/Runtime/DSTM/interface/trans.c

index 82e415dfa0ba9f1f5436bb32e6ee4dab1cb5a1ac..4b5b9e4be3de141af64f48347923d9950b0494fe 100644 (file)
@@ -8,10 +8,10 @@
 /***********************************************************
  *       Macros
  **********************************************************/
-#define GET_NTUPLES(x)         ((int *)(x + sizeof(prefetchqelem_t)))
-#define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
-#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
-#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+#define GET_NTUPLES(x)         ((int *)(x))
+#define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(int)))
+#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(int) + (n*sizeof(unsigned int))))
+#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
 /*****************************************
  *  Coordinator Messages
  ***************************************/
@@ -268,7 +268,7 @@ void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
 void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
-prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
+prefetchpile_t *foundLocal(char *);// returns node with prefetch elements(oids, offsets)
 int lookupObject(unsigned int * oid, short offset);
 int transPrefetchProcess(transrecord_t *, int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
index ddacde978c9f36b67413a6e60c111962b25860d9..b5bcac5bc1a316ddd6f4efa748e9f529ffe4f421 100644 (file)
@@ -1,6 +1,7 @@
 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
  * Participant => Machines that host the objects involved in a transaction commit */
 
+#include <netinet/tcp.h>
 #include "dstm.h"
 #include "mlookup.h"
 #include "llookup.h"
@@ -103,7 +104,9 @@ void *dstmListen()
        while(1)
        {
          int retval;
+         int flag=1;
          acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+         setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
          do {
            retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
          } while(retval!=0);
@@ -127,6 +130,8 @@ void *dstmAccept(void *acceptfd) {
   /* Receive control messages from other machines */
   while(1) {
     int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+    if (ret==0)
+      return;
     if (ret==-1) {
       printf("DEBUG -> RECV Error!.. retrying\n");
       break;
index a8e5d81f4e9dbd6745dd56cdf155c67b656857a4..bb58ecef88646541f0c55ff5fe77181187b80173 100644 (file)
@@ -38,13 +38,6 @@ prefetchpile_t *mcpiledequeue(void) {
   return retnode;
 }
 
-void mcpiledelete(void) {
-  /* Remove each element */
-  while(mcqueue.front != NULL)
-    delqnode();
-}
-
-
 void mcpiledisplay() {
   int mid;
   
index 5c4046c970bbef3ae302358a409b1b46fc9d4a24..f32c7b69232995d6f988cbb928c8812636438be8 100644 (file)
@@ -31,7 +31,6 @@ typedef struct mcpileq {
 void mcpileqInit(void);
 void mcpileenqueue(prefetchpile_t *, prefetchpile_t *);
 prefetchpile_t *mcpiledequeue(void);
-void mcpiledelete();
 void mcpiledisplay();
 void mcdealloc(prefetchpile_t *);
 
index 1056959b4dd51aaac76242b2236750850059c1a0..51d586c5f17f76100bf1e58f07a64283c8049c30 100644 (file)
@@ -1,81 +1,80 @@
 #include "queue.h"
 
-primarypfq_t pqueue; //Global queue
+volatile int headoffset, tailoffset;
+char * memory;
+pthread_mutex_t qlock;
+pthread_mutexattr_t qlockattr;
+pthread_cond_t qcond;
+
+
+#define QSIZE 1000000 //1 MB
 
 void queueInit(void) {
   /* Intitialize primary queue */
-  pqueue.front = pqueue.rear = NULL;
-  pthread_mutexattr_init(&pqueue.qlockattr);
-  pthread_mutexattr_settype(&pqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
-  pthread_mutex_init(&pqueue.qlock, &pqueue.qlockattr);
-  pthread_cond_init(&pqueue.qcond, NULL);
+  headoffset=0;
+  tailoffset=0;
+  memory=malloc(QSIZE);
+  pthread_mutexattr_init(&qlockattr);
+  pthread_mutexattr_settype(&qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+  pthread_mutex_init(&qlock, &qlockattr);
+  pthread_cond_init(&qcond, NULL);
 }
 
-/* Delete the node pointed to by the front ptr of the queue */
-void delqnode() {
-  prefetchqelem_t *delnode;
-  if(pqueue.front == NULL) {
-    printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
-    return;
-  } else if (pqueue.front == pqueue.rear) {
-    free(pqueue.front);
-    pqueue.front = pqueue.rear = NULL;
+void * getmemory(int size) {
+  int tmpoffset=headoffset+size+sizeof(int);
+  if (tmpoffset>QSIZE) {
+    //Wait for tail to go past end
+    tmpoffset=size+sizeof(int);
+    while(headoffset<tailoffset)
+      ;
+    //Wait for tail to go past new start
+    while(tailoffset<tmpoffset)
+      ;
+    *((int *)(memory+headoffset))=-1;
+    *((int*)memory)=size+sizeof(int);
+    return memory+sizeof(int);
   } else {
-    delnode = pqueue.front;
-    pqueue.front = pqueue.front->next;
-    free(delnode);
+    while(headoffset<tailoffset&&tailoffset<tmpoffset)
+      ;
+    *((int*)(memory+headoffset))=size+sizeof(int);
+    return memory+headoffset+sizeof(int);
   }
 }
 
-void queueDelete(void) {
-  /* Remove each element */
-  while(pqueue.front != NULL)
-    delqnode();
+void movehead(int size) {
+  int tmpoffset=headoffset+size+sizeof(int);
+  if (tmpoffset>QSIZE) {
+    headoffset=size+sizeof(int);
+  } else
+    headoffset=tmpoffset;
+  pthread_cond_signal(&qcond);//wake the other thread up
 }
 
-/* Inserts to the rear of primary prefetch queue */
-void pre_enqueue(prefetchqelem_t *qnode) {
-  if(pqueue.front == NULL) {
-    pqueue.front = pqueue.rear = qnode;
-    qnode->next=NULL;
-  } else {
-    qnode->next = NULL;
-    pqueue.rear->next = qnode;
-    pqueue.rear = qnode;
+void * gettail() {
+  while(tailoffset==headoffset) {
+    //Sleep
+    pthread_mutex_lock(&qlock);
+    if (tailoffset==headoffset)
+      pthread_cond_wait(&qcond, &qlock);
+    pthread_mutex_unlock(&qlock);
   }
-}
-
-/* Return the node pointed to by the front ptr of the queue */
-prefetchqelem_t *pre_dequeue(void) {
-  prefetchqelem_t *retnode;
-  if (pqueue.front == NULL) {
-    printf("Queue empty: Underflow %s, %d\n", __FILE__, __LINE__);
-    return NULL;
+  if (*((int *)(memory+tailoffset))==-1) {
+    tailoffset=0;//do loop
   }
-  retnode = pqueue.front;
-  pqueue.front = pqueue.front->next;
-  if (pqueue.front == NULL)
-    pqueue.rear = NULL;
-  retnode->next = NULL;
-  
-  return retnode;
+
+  return memory+tailoffset+sizeof(int);
 }
 
-void queueDisplay() {
-  int offset = sizeof(prefetchqelem_t);
-  int *ptr;
-  int ntuples;
-  char *ptr1;
-  prefetchqelem_t *tmp = pqueue.front;
-  while(tmp != NULL) {
-    ptr1 = (char *) tmp;
-    ptr = (int *)(ptr1 + offset);
-    ntuples = *ptr;
-    tmp = tmp->next;
-  }
+void inctail() {
+  int tmpoffset=tailoffset+*((int *)(memory+tailoffset));
+  if (tmpoffset>QSIZE)
+    tailoffset=0;
+  else
+    tailoffset=tmpoffset;
 }
 
-void predealloc(prefetchqelem_t *node) {
-  free(node);
+
+void predealloc() {
+  free(memory);
 }
 
index d315135bc0e6fe301e2f78fdb0e5dab034e0574f..d8a751fdaca5dd9db5082053c2090e1fa7058d12 100644 (file)
@@ -6,24 +6,10 @@
 #include<pthread.h>
 #include<string.h>
 
-// DS that contains information to be shared between threads.
-typedef struct prefetchqelem {
-       struct prefetchqelem *next;
-} prefetchqelem_t;
-
-typedef struct primarypfq {
-       prefetchqelem_t *front, *rear;
-       pthread_mutex_t qlock;
-       pthread_mutexattr_t qlockattr;
-       pthread_cond_t qcond;
-} primarypfq_t; 
-
-
 void queueInit(void);
-void delqnode(); 
-void queueDelete(void);
-void pre_enqueue(prefetchqelem_t *);
-prefetchqelem_t *pre_dequeue(void);
-void queueDisplay();
-void predealloc(prefetchqelem_t *);
+void * getmemory(int size);
+void movehead(int size);
+void * gettail();
+void inctail();
+void predealloc();
 #endif
index 64337b25927215a187a8e8b87799ba45f099a490..e45cdc552456d836db450b77b72b00416ad129c4 100644 (file)
@@ -1,5 +1,5 @@
 #include "sockpool.h"
-
+#include <netinet/tcp.h>
 
 #if defined(__i386__)
 inline static int test_and_set(volatile unsigned int *addr) {
@@ -59,10 +59,12 @@ sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int
 
 int createNewSocket(unsigned int mid) {
   int sd;
+  int flag=1;
   if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
     printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
     return -1;
   }
+  setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
   struct sockaddr_in remoteAddr;
   bzero(&remoteAddr, sizeof(remoteAddr));
   remoteAddr.sin_family = AF_INET;
index 4141f2227a9f5a1e8485dfedf11513892bf7ec87..0fdd0443a535af76fec3d50b2b602c948edc016f 100644 (file)
@@ -18,7 +18,6 @@
 
 /* Global Variables */
 extern int classsize[];
-extern primarypfq_t pqueue; //Shared prefetch queue
 objstr_t *prefetchcache; //Global Prefetch cache
 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
@@ -83,13 +82,15 @@ int recv_data_errorcode(int fd , void *buf, int buflen) {
   int numbytes; 
   while (size > 0) {
     numbytes = recv(fd, buffer, size, 0);
+    if (numbytes==0)
+      return 0;
     if (numbytes == -1) {
       return -1;
     }
     buffer += numbytes;
     size -= numbytes;
   }
-  return 0;
+  return 1;
 }
 
 void printhex(unsigned char *ptr, int numBytes) {
@@ -126,22 +127,19 @@ inline int findmax(int *array, int arraylength) {
  * populates the shared primary prefetch queue*/
 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
   /* Allocate for the queue node*/
-  int qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
-  char * node= malloc(qnodesize);
+  int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
+  char * node= getmemory(qnodesize);
   /* Set queue node values */
-  int len = sizeof(prefetchqelem_t);
+  int len;
   int top=endoffsets[ntuples-1];
-  *((int *)(node+len))=ntuples;
-  len += sizeof(int);
+  *((int *)(node))=ntuples;
+  len = sizeof(int);
   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
 
   /* Lock and insert into primary prefetch queue */
-  pthread_mutex_lock(&pqueue.qlock);
-  pre_enqueue((prefetchqelem_t *)node);
-  pthread_cond_signal(&pqueue.qcond);
-  pthread_mutex_unlock(&pqueue.qlock);
+  movehead(qnodesize);
 }
 
 /* This function starts up the transaction runtime. */
@@ -1019,8 +1017,7 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
        return 0;
 }
 
-prefetchpile_t *foundLocal(prefetchqelem_t *node) {
-  char * ptr = (char *) node;
+prefetchpile_t *foundLocal(char *ptr) {
   int ntuples = *(GET_NTUPLES(ptr));
   unsigned int * oidarray = GET_PTR_OID(ptr);
   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); 
@@ -1104,19 +1101,10 @@ int lookupObject(unsigned int * oid, short offset) {
 void *transPrefetch(void *t) {
   while(1) {
     /* lock mutex of primary prefetch queue */
-    pthread_mutex_lock(&pqueue.qlock);
-    /* while primary queue is empty, then wait */
-    while(pqueue.front == NULL) {
-      pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
-    }
-    
-    /* dequeue node to create a machine piles and  finally unlock mutex */
-    prefetchqelem_t *qnode = pre_dequeue();
-    pthread_mutex_unlock(&pqueue.qlock);
-    
+    void *node=gettail();
     /* Check if the tuples are found locally, if yes then reduce them further*/ 
     /* and group requests by remote machine ids by calling the makePreGroups() */
-    prefetchpile_t *pilehead = foundLocal(qnode);
+    prefetchpile_t *pilehead = foundLocal(node);
 
     if (pilehead!=NULL) {
       // Get sock from shared pool 
@@ -1134,10 +1122,9 @@ void *transPrefetch(void *t) {
       
       /* Deallocated pilehead */
       mcdealloc(pilehead);
-      
     }
     // Deallocate the prefetch queue pile node
-    predealloc(qnode);
+    inctail();
   }
 }
 
@@ -1217,6 +1204,7 @@ int getPrefetchResponse(int sd) {
   control = *((char *) recvbuffer);
   if(control == OBJECT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+    //printf("oid %d found\n",oid);
     size = size - (sizeof(char) + sizeof(unsigned int));
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {