changes
authorbdemsky <bdemsky>
Tue, 22 Sep 2009 10:34:04 +0000 (10:34 +0000)
committerbdemsky <bdemsky>
Tue, 22 Sep 2009 10:34:04 +0000 (10:34 +0000)
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/prefetch.c
Robust/src/Runtime/DSTM/interface/prefetch.h
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c

index ca7c845a0dec9c2235e665ef0e3a52cf58e5673b..c3cc8bd14b57f267e91d92f0053772028a6af5e0 100644 (file)
@@ -51,6 +51,7 @@ void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
     if(getOperationMode(siteid) != 0) {
       evalPrefetch[siteid].uselesscount--;
       if(evalPrefetch[siteid].uselesscount <= 0) {
+       printf("O");
        evalPrefetch[siteid].operMode = 0;
       }
     }
index 9a993d88c25ada7b162ab0289df0c753057b1266..983405db5c0a6f31455217e232dbfb4faddb4458 100644 (file)
@@ -82,6 +82,7 @@
 #include <signal.h>
 #include "plookup.h"
 #include "dsmdebug.h"
+#include "readstruct.h"
 #ifdef ABORTREADERS
 #include <setjmp.h>
 #endif
@@ -234,8 +235,8 @@ void clearObjStore(); // TODO:currently only clears the prefetch cache object st
 void *dstmListen(void *);
 int startlistening();
 void *dstmAccept(void *);
-int readClientReq(trans_commit_data_t *, int);
-int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
+int readClientReq(trans_commit_data_t *, int, struct readstruct * readbuffer);
+int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int, struct readstruct *);
 char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
 char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
@@ -292,7 +293,7 @@ int checkoid(unsigned int oid);
 int transPrefetchProcess(int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
 void sendPrefetchReqnew(prefetchpile_t*, int);
-int getPrefetchResponse(int);
+int getPrefetchResponse(int, struct readstruct *);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs);
index ca40fac2ad2c07d4768e9019f4143ca77510c072..edfbb4e8c2ced0a8649002593698333ca6e2a3c0 100644 (file)
@@ -12,6 +12,7 @@
 #include "thread.h"
 #endif
 #include "gCollect.h"
+#include "readstruct.h"
 
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
@@ -130,10 +131,13 @@ void *dstmAccept(void *acceptfd) {
   trans_commit_data_t transinfo;
   unsigned short objType, *versionarry, version;
   unsigned int *oidarry, numoid, mid, threadid;
+  struct readstruct readbuffer;
+  readbuffer.head=0;
+  readbuffer.tail=0;
 
   /* Receive control messages from other machines */
   while(1) {
-    int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+    int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
     if (ret==0)
       break;
     if (ret==-1) {
@@ -143,7 +147,7 @@ void *dstmAccept(void *acceptfd) {
     switch(control) {
     case READ_REQUEST:
       /* Read oid requested and search if available */
-      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+      recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
       while((srcObj = mhashSearch(oid)) == NULL) {
        int ret;
        if((ret = sched_yield()) != 0) {
@@ -182,7 +186,7 @@ void *dstmAccept(void *acceptfd) {
       transinfo.modptr = NULL;
       transinfo.numlocked = 0;
       transinfo.numnotfound = 0;
-      if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
+      if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
        pthread_exit(NULL);
       }
@@ -190,12 +194,12 @@ void *dstmAccept(void *acceptfd) {
 
     case TRANS_PREFETCH:
 #ifdef RANGEPREFETCH
-      if((val = rangePrefetchReq((int)acceptfd)) != 0) {
+      if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
        break;
       }
 #else
-      if((val = prefetchReq((int)acceptfd)) != 0) {
+      if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
        break;
       }
@@ -204,12 +208,12 @@ void *dstmAccept(void *acceptfd) {
 
     case TRANS_PREFETCH_RESPONSE:
 #ifdef RANGEPREFETCH
-      if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
+      if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
        break;
       }
 #else
-      if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+      if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) {
        printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
        break;
       }
@@ -217,20 +221,20 @@ void *dstmAccept(void *acceptfd) {
       break;
 
     case START_REMOTE_THREAD:
-      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+      recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
       objType = getObjType(oid);
       startDSMthread(oid, objType);
       break;
 
     case THREAD_NOTIFY_REQUEST:
-      recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+      recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int));
       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
        pthread_exit(NULL);
       }
 
-      recv_data((int)acceptfd, buffer, size);
+      recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
 
       oidarry = calloc(numoid, sizeof(unsigned int));
       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
@@ -243,7 +247,6 @@ void *dstmAccept(void *acceptfd) {
       threadid = *((unsigned int *)(buffer+size));
       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
       free(buffer);
-
       break;
 
     case THREAD_NOTIFY_RESPONSE:
@@ -253,7 +256,7 @@ void *dstmAccept(void *acceptfd) {
        pthread_exit(NULL);
       }
 
-      recv_data((int)acceptfd, buffer, size);
+      recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
 
       oid = *((unsigned int *)buffer);
       size = sizeof(unsigned int);
@@ -281,7 +284,7 @@ closeconnection:
 
 /* This function reads the information available in a transaction request
  * and makes a function call to process the request */
-int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
+int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) {
   char *ptr;
   void *modptr;
   unsigned int *oidmod, oid;
@@ -295,14 +298,14 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   size = sizeof(fixed) - 1;
   ptr = (char *)&fixed;;
   fixed.control = TRANS_REQUEST;
-  recv_data((int)acceptfd, ptr+1, size);
+  recv_data_buf((int)acceptfd, readbuffer, ptr+1, size);
 
   /* Read list of mids */
   int mcount = fixed.mcount;
   size = mcount * sizeof(unsigned int);
   unsigned int listmid[mcount];
   ptr = (char *) listmid;
-  recv_data((int)acceptfd, ptr, size);
+  recv_data_buf((int)acceptfd, readbuffer, ptr, size);
 
   /* Read oid and version tuples for those objects that are not modified in the transaction */
   int numread = fixed.numread;
@@ -310,7 +313,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   char objread[size];
   if(numread != 0) { //If pile contains more than one object to be read,
     // keep reading all objects
-    recv_data((int)acceptfd, objread, size);
+    recv_data_buf((int)acceptfd, readbuffer, objread, size);
   }
 
   /* Read modified objects */
@@ -320,7 +323,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
       return 1;
     }
     size = fixed.sum_bytes;
-    recv_data((int)acceptfd, modptr, size);
+    recv_data_buf((int)acceptfd, readbuffer, modptr, size);
   }
 
   /* Create an array of oids for modified objects */
@@ -340,7 +343,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   }
 
   /*Process the information read */
-  if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
+  if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) {
     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
     /* Free resources */
     if(oidmod != NULL) {
@@ -361,7 +364,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
  * function and sends a reply to the co-ordinator.
  * Following this it also receives a new control message from the co-ordinator and processes this message*/
 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
-                     unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
+                     unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) {
 
   char control, sendctrl, retval;
   objheader_t *tmp_header;
@@ -374,7 +377,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     return 1;
   }
 
-  recv_data((int)acceptfd, &control, sizeof(char));
+  recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char));
   /* Process the new control message */
   switch(control) {
   case TRANS_ABORT:
@@ -736,7 +739,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
  * If objects are not found then record those and if objects are found
  * then use offset values to prefetch references to other objects */
 
-int prefetchReq(int acceptfd) {
+int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
   int i, size, objsize, numoffset = 0;
   int length;
   char *recvbuffer, control;
@@ -746,10 +749,10 @@ int prefetchReq(int acceptfd) {
   int sd = -1;
 
   while(1) {
-    recv_data((int)acceptfd, &numoffset, sizeof(int));
+    recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
     if(numoffset == -1)
       break;
-    recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+    recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
     oid = oidmid.oid;
     if (mid != oidmid.mid) {
       if (mid!=-1) {
@@ -759,23 +762,24 @@ int prefetchReq(int acceptfd) {
       sd = getSockWithLock(transPResponseSocketPool, mid);
     }
     short offsetarry[numoffset];
-    recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
+    recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
 
     /*Process each oid */
     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
       /* Save the oids not found in buffer for later use */
       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-      char sendbuffer[size];
-      *((int *) sendbuffer) = size;
-      *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
-      *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-      control = TRANS_PREFETCH_RESPONSE;
-      sendPrefetchResponse(sd, &control, sendbuffer, &size);
+      char sendbuffer[size+1];
+      sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+      *((int *) (sendbuffer+sizeof(char))) = size;
+      *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+      *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+      send_data(sd, sendbuffer, size+1);
     } else { /* Object Found */
-      int incr = 0;
+      int incr = 1;
       GETSIZE(objsize, header);
       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-      char sendbuffer[size];
+      char sendbuffer[size+1];
+      sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
       *((int *)(sendbuffer + incr)) = size;
       incr += sizeof(int);
       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
@@ -783,9 +787,7 @@ int prefetchReq(int acceptfd) {
       *((unsigned int *)(sendbuffer+incr)) = oid;
       incr += sizeof(unsigned int);
       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
-      control = TRANS_PREFETCH_RESPONSE;
-      sendPrefetchResponse(sd, &control, sendbuffer, &size);
+      send_data(sd, sendbuffer, size+1);
 
       /* Calculate the oid corresponding to the offset value */
       for(i = 0 ; i< numoffset ; i++) {
@@ -809,19 +811,20 @@ int prefetchReq(int acceptfd) {
 
        if((header = mhashSearch(oid)) == NULL) {
          size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-         char sendbuffer[size];
-         *((int *) sendbuffer) = size;
-         *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
-         *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+         char sendbuffer[size+1];
+         sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+         *((int *) (sendbuffer+1)) = size;
+         *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+         *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
 
-         control = TRANS_PREFETCH_RESPONSE;
-         sendPrefetchResponse(sd, &control, sendbuffer, &size);
+         send_data(sd, sendbuffer, size+1);
          break;
        } else { /* Obj Found */
-         int incr = 0;
+         int incr = 1;
          GETSIZE(objsize, header);
          size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-         char sendbuffer[size];
+         char sendbuffer[size+1];
+         sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
          *((int *)(sendbuffer + incr)) = size;
          incr += sizeof(int);
          *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
@@ -829,9 +832,7 @@ int prefetchReq(int acceptfd) {
          *((unsigned int *)(sendbuffer+incr)) = oid;
          incr += sizeof(unsigned int);
          memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
-         control = TRANS_PREFETCH_RESPONSE;
-         sendPrefetchResponse(sd, &control, sendbuffer, &size);
+         send_data(sd, sendbuffer, size+1);
        }
       } //end of for
     }
index de522845df044925c7d4fc6c00c7d99315f3c30f..956dd000c278348872950b533c8def563d62a7ef 100644 (file)
@@ -331,12 +331,12 @@ void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd, unsigned int
   return;
 }
 
-int getRangePrefetchResponse(int sd) {
+int getRangePrefetchResponse(int sd, struct readstruct * readbuffer) {
   int length = 0;
-  recv_data(sd, &length, sizeof(int));
+  recv_data_buf(sd, readbuffer, &length, sizeof(int));
   int size = length - sizeof(int);
   char recvbuffer[size];
-  recv_data(sd, recvbuffer, size);
+  recv_data_buf(sd, readbuffer, recvbuffer, size);
   char control = *((char *) recvbuffer);
   unsigned int oid;
   if(control == OBJECT_FOUND) {
@@ -390,16 +390,16 @@ int getRangePrefetchResponse(int sd) {
   return 0;
 }
 
-int rangePrefetchReq(int acceptfd) {
+int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer) {
   int numoffset, sd = -1;
   unsigned int baseoid, mid = -1;
   oidmidpair_t oidmid;
 
   while (1) {
-    recv_data(acceptfd, &numoffset, sizeof(int));
+    recv_data_buf(acceptfd, readbuffer, &numoffset, sizeof(int));
     if(numoffset == -1)
       break;
-    recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
+    recv_data_buf(acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
     baseoid = oidmid.oid;
     if(mid != oidmid.mid) {
       if(mid!= -1)
@@ -408,7 +408,7 @@ int rangePrefetchReq(int acceptfd) {
       sd = getSockWithLock(transPResponseSocketPool, mid);
     }
     short offsetsarry[numoffset];
-    recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
+    recv_data_buf(acceptfd, readbuffer, offsetsarry, numoffset*sizeof(short));
 
     perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset);
 
index 08697d57e44a9ef7a723a9c0700c3727c6c71ffc..3e1c2dd91913e44f7851ec8370a972b83bfdeb26 100644 (file)
@@ -2,6 +2,7 @@
 #define _PREFETCH_H_
 #include "queue.h"
 #include "dstm.h"
+#include "readstruct.h"
 
 #define GET_STRIDE(x) ((x & 0x7000) >> 12)
 #define GET_RANGE(x) (x & 0x0fff)
@@ -54,9 +55,9 @@ void insertPrefetch(int, unsigned int, short, short*, perMcPrefetchList_t **);
 
 /******** Sending and Receiving Prefetches *******/
 void sendRangePrefetchReq(perMcPrefetchList_t *, int sd, unsigned int mid);
-int rangePrefetchReq(int acceptfd);
+int rangePrefetchReq(int acceptfd, struct readstruct * readbuffer);
 int processOidFound(objheader_t *, short *, int, int, int);
-int getRangePrefetchResponse(int sd);
+int getRangePrefetchResponse(int sd, struct readstruct *);
 INLINE objheader_t *searchObj(unsigned int);
 
 
index fcb58191864383ea2b65ceacde1c56ab08ae47f8..8ebff8088ffaf771ff9503740297fc88f417b6a6 100644 (file)
@@ -8,6 +8,13 @@ pthread_cond_t qcond;
 
 #define QSIZE 2048 //2 KB
 
+extern char bigarray[16*1024*1024];
+extern int bigindex;
+#define LOGEVENT(x) { \
+    int tmp=bigindex++;                                \
+    bigarray[tmp]=x;                           \
+  }
+
 void queueInit(void) {
   /* Intitialize primary queue */
   headoffset=0;
@@ -58,10 +65,12 @@ void movehead(int size) {
 void * gettail() {
   while(tailoffset==headoffset) {
     //Sleep
-    //    pthread_mutex_lock(&qlock);
-    //    if (tailoffset==headoffset)
-    //      pthread_cond_wait(&qcond, &qlock);
-    //    pthread_mutex_unlock(&qlock);
+    LOGEVENT('T');
+    pthread_mutex_lock(&qlock);
+    if (tailoffset==headoffset)
+      pthread_cond_wait(&qcond, &qlock);
+    pthread_mutex_unlock(&qlock);
+    LOGEVENT('W');
   }
   if (*((int *)(memory+tailoffset))==-1) {
     tailoffset=0; //do loop
index d59cb23aad3f38c826f4c065071e5b1190fa7e83..36465abb341c58df559316fe8dcff1df182f0115 100644 (file)
@@ -79,6 +79,14 @@ void printhex(unsigned char *, int);
 plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
+char bigarray[16*1024*1024];
+int bigindex=0;
+#define LOGEVENT(x) { \
+    int tmp=bigindex++;                                \
+    bigarray[tmp]=x;                           \
+  }
+
+
 /*******************************
 * Send and Recv function calls
 *******************************/
@@ -98,6 +106,87 @@ void send_data(int fd, void *buf, int buflen) {
   }
 }
 
+void recv_data_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+  char *buf=(char *)buffer;
+
+  int numbytes=readbuffer->head-readbuffer->tail;
+  if (numbytes>buflen)
+    numbytes=buflen;
+  if (numbytes>0) {
+    memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+    readbuffer->tail+=numbytes;
+    buflen-=numbytes;
+    buf+=numbytes;
+    if (buflen==0) {
+      return;
+    }
+  }
+
+  if (buflen>=MAXBUF) {
+    recv_data(fd, buf, buflen);
+    return;
+  }
+  
+  int maxbuf=MAXBUF;
+  int obufflen=buflen;
+  readbuffer->head=0;
+  
+  while (buflen > 0) {
+    int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+    if (numbytes == -1) {
+      perror("recv");
+      exit(0);
+    }
+    buflen-=numbytes;
+    readbuffer->head+=numbytes;
+    maxbuf-=numbytes;
+  }
+  memcpy(buf,readbuffer->buf,obufflen);
+  readbuffer->tail=obufflen;
+}
+
+int recv_data_errorcode_buf(int fd, struct readstruct * readbuffer, void *buffer, int buflen) {
+  char *buf=(char *)buffer;
+  //now tail<=head
+  int numbytes=readbuffer->head-readbuffer->tail;
+  if (numbytes>buflen)
+    numbytes=buflen;
+  if (numbytes>0) {
+    memcpy(buf, &readbuffer->buf[readbuffer->tail], numbytes);
+    readbuffer->tail+=numbytes;
+    buflen-=numbytes;
+    buf+=numbytes;
+    if (buflen==0)
+      return 1;
+  }
+
+  if (buflen>=MAXBUF) {
+    return recv_data_errorcode(fd, buf, buflen);
+  }
+
+  int maxbuf=MAXBUF;
+  int obufflen=buflen;
+  readbuffer->head=0;
+  
+  while (buflen > 0) {
+    int numbytes = recv(fd, &readbuffer->buf[readbuffer->head], maxbuf, 0);
+    if (numbytes ==0) {
+      return 0;
+    }
+    if (numbytes==-1) {
+      perror("recvbuf");
+      return -1;
+    }
+    buflen-=numbytes;
+    readbuffer->head+=numbytes;
+    maxbuf-=numbytes;
+  }
+  memcpy(buf,readbuffer->buf,obufflen);
+  readbuffer->tail=obufflen;
+  return 1;
+}
+
+
 void recv_data(int fd, void *buf, int buflen) {
   char *buffer = (char *)(buf);
   int size = buflen;
@@ -168,11 +257,17 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   /* Allocate for the queue node*/
   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
   int len;
-  char * node= getmemory(qnodesize);
+#ifdef INLINEPREFETCH
+  char node[qnodesize];
+#else
+  char *node=getmemory(qnodesize);
+#endif
   int top=endoffsets[ntuples-1];
 
-  if (node==NULL)
+  if (node==NULL) {
+    LOGEVENT('D');
     return;
+  }
   /* Set queue node values */
 
   /* TODO: Remove this after testing */
@@ -185,8 +280,30 @@ void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endof
   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
 
+#ifdef INLINEPREFETCH
+  prefetchpile_t *pilehead = foundLocal(node);
+
+  if (pilehead!=NULL) {
+    // Get sock from shared pool
+    
+    /* Send  Prefetch Request */
+    prefetchpile_t *ptr = pilehead;
+    while(ptr != NULL) {
+      int sd = getSock2(transPrefetchSockPool, ptr->mid);
+      sendPrefetchReq(ptr, sd);
+      ptr = ptr->next;
+    }
+    
+    /* Release socket */
+    // freeSock(transPrefetchSockPool, pilehead->mid, sd);
+    
+    /* Deallocated pilehead */
+    mcdealloc(pilehead);
+  }
+#else
   /* Lock and insert into primary prefetch queue */
   movehead(qnodesize);
+#endif
 }
 
 /* This function starts up the transaction runtime. */
@@ -302,9 +419,11 @@ void transInit() {
     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
   } while(retval!=0);
 #else
+#ifndef INLINEPREFETCH
   do {
     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
   } while(retval!=0);
+#endif
 #endif
   pthread_detach(tPrefetch);
 #endif
@@ -523,6 +642,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
 #ifdef CACHE
     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
 #ifdef TRANSSTATS
+      LOGEVENT('P')
       nprehashSearch++;
 #endif
       /* Look up in prefetch cache */
@@ -551,6 +671,8 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       return NULL;
     } else {
 #ifdef TRANSSTATS
+
+      LOGEVENT('R');
       nRemoteSend++;
 #endif
 #ifdef COMPILER
@@ -662,6 +784,13 @@ int transCommit() {
   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
   char finalResponse;
 
+#ifdef TRANSSTATS
+  int iii;
+  for(iii=0;iii<bigindex;iii++) {
+    printf("%c", bigarray[iii]);
+  }
+#endif
+
 #ifdef ABORTREADERS
   if (t_abort) {
     //abort this transaction
@@ -1320,6 +1449,7 @@ prefetchpile_t *foundLocal(char *ptr) {
     unsigned int oid=oidarray[i];
     int newbase;
     int machinenum;
+
     if (oid==0)
       continue;
     //Look up fields locally
@@ -1457,15 +1587,19 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
   objpile_t *tmp;
 
   /* Send TRANS_PREFETCH control message */
-  control = TRANS_PREFETCH;
-  send_data(sd, &control, sizeof(char));
+  int first=1;
 
   /* Send Oids and offsets in pairs */
   tmp = mcpilenode->objpiles;
   while(tmp != NULL) {
     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
-    char oidnoffset[len];
+    char oidnoffset[len+5];
     char *buf=oidnoffset;
+    if (first) {
+      *buf=TRANS_PREFETCH;
+      buf++;len++;
+      first=0;
+    }
     *((int*)buf) = tmp->numoffset;
     buf+=sizeof(int);
     *((unsigned int *)buf) = tmp->oid;
@@ -1476,30 +1610,32 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
     *((unsigned int *)buf) = myIpAddr;
     buf += sizeof(unsigned int);
     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
-    send_data(sd, oidnoffset, len);
     tmp = tmp->next;
+    if (tmp==NULL) {
+      *((int *)(&oidnoffset[len]))=-1;
+      len+=sizeof(int);
+    }
+    send_data(sd, oidnoffset, len);
   }
 
-  /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
-  endpair = -1;
-  send_data(sd, &endpair, sizeof(int));
-
+  LOGEVENT('S');
   return;
 }
 
-int getPrefetchResponse(int sd) {
+int getPrefetchResponse(int sd, struct readstruct *readbuffer) {
   int length = 0, size = 0;
   char control;
   unsigned int oid;
   void *modptr, *oldptr;
 
-  recv_data((int)sd, &length, sizeof(int));
+  recv_data_buf(sd, readbuffer, &length, sizeof(int));
   size = length - sizeof(int);
   char recvbuffer[size];
 #ifdef TRANSSTATS
     getResponse++;
+    LOGEVENT('Z');
 #endif
-  recv_data((int)sd, recvbuffer, size);
+    recv_data_buf(sd, readbuffer, recvbuffer, size);
   control = *((char *) recvbuffer);
   if(control == OBJECT_FOUND) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));