bug fix
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
index d65c0df5d08e0d4fe0299c1ed2bde636bf08a872..0cb3490d2e74c5342e3252120fa73a8ca11fda52 100644 (file)
@@ -6,9 +6,13 @@
 #include "mlookup.h"
 #include "llookup.h"
 #include "threadnotify.h"
+#include "prefetch.h"
+#include <sched.h>
 #ifdef COMPILER
 #include "thread.h"
 #endif
+#include "gCollect.h"
+#include "readstruct.h"
 
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
@@ -24,7 +28,6 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a
 
 sockPoolHashTable_t *transPResponseSocketPool;
 
-
 /* This function initializes the main objects store and creates the
  * global machine and location lookup table */
 
@@ -35,7 +38,7 @@ int dstmInit(void) {
   pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
   pthread_mutex_init(&lockObjHeader,NULL);
-  if (mhashCreate(HASH_SIZE, LOADFACTOR))
+  if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
     return 1;             //failure
 
   if (lhashCreate(HASH_SIZE, LOADFACTOR))
@@ -128,10 +131,13 @@ void *dstmAccept(void *acceptfd) {
   trans_commit_data_t transinfo;
   unsigned short objType, *versionarry, version;
   unsigned int *oidarry, numoid, mid, threadid;
+  struct readstruct readbuffer;
+  readbuffer.head=0;
+  readbuffer.tail=0;
 
   /* Receive control messages from other machines */
   while(1) {
-    int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+    int ret=recv_data_errorcode_buf((int)acceptfd, &readbuffer, &control, sizeof(char));
     if (ret==0)
       break;
     if (ret==-1) {
@@ -141,10 +147,12 @@ void *dstmAccept(void *acceptfd) {
     switch(control) {
     case READ_REQUEST:
       /* Read oid requested and search if available */
-      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
-      if((srcObj = mhashSearch(oid)) == NULL) {
-       printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-       break;
+      recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
+      while((srcObj = mhashSearch(oid)) == NULL) {
+       int ret;
+       if((ret = sched_yield()) != 0) {
+         printf("%s(): error no %d in thread yield\n", __func__, errno);
+       }
       }
       h = (objheader_t *) srcObj;
       GETSIZE(size, h);
@@ -178,41 +186,55 @@ void *dstmAccept(void *acceptfd) {
       transinfo.modptr = NULL;
       transinfo.numlocked = 0;
       transinfo.numnotfound = 0;
-      if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
+      if((val = readClientReq(&transinfo, (int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
        pthread_exit(NULL);
       }
       break;
 
     case TRANS_PREFETCH:
-      if((val = prefetchReq((int)acceptfd)) != 0) {
+#ifdef RANGEPREFETCH
+      if((val = rangePrefetchReq((int)acceptfd, &readbuffer)) != 0) {
+       printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
+#else
+      if((val = prefetchReq((int)acceptfd, &readbuffer)) != 0) {
        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
        break;
       }
+#endif
       break;
 
     case TRANS_PREFETCH_RESPONSE:
-      if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+#ifdef RANGEPREFETCH
+      if((val = getRangePrefetchResponse((int)acceptfd, &readbuffer)) != 0) {
+       printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
+#else
+      if((val = getPrefetchResponse((int) acceptfd, &readbuffer)) != 0) {
        printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
        break;
       }
+#endif
       break;
 
     case START_REMOTE_THREAD:
-      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+      recv_data_buf((int)acceptfd, &readbuffer, &oid, sizeof(unsigned int));
       objType = getObjType(oid);
       startDSMthread(oid, objType);
       break;
 
     case THREAD_NOTIFY_REQUEST:
-      recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+      recv_data_buf((int)acceptfd, &readbuffer, &numoid, sizeof(unsigned int));
       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
        pthread_exit(NULL);
       }
 
-      recv_data((int)acceptfd, buffer, size);
+      recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
 
       oidarry = calloc(numoid, sizeof(unsigned int));
       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
@@ -225,7 +247,6 @@ void *dstmAccept(void *acceptfd) {
       threadid = *((unsigned int *)(buffer+size));
       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
       free(buffer);
-
       break;
 
     case THREAD_NOTIFY_RESPONSE:
@@ -235,7 +256,7 @@ void *dstmAccept(void *acceptfd) {
        pthread_exit(NULL);
       }
 
-      recv_data((int)acceptfd, buffer, size);
+      recv_data_buf((int)acceptfd, &readbuffer, buffer, size);
 
       oid = *((unsigned int *)buffer);
       size = sizeof(unsigned int);
@@ -263,7 +284,7 @@ closeconnection:
 
 /* This function reads the information available in a transaction request
  * and makes a function call to process the request */
-int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
+int readClientReq(trans_commit_data_t *transinfo, int acceptfd, struct readstruct * readbuffer) {
   char *ptr;
   void *modptr;
   unsigned int *oidmod, oid;
@@ -277,14 +298,14 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   size = sizeof(fixed) - 1;
   ptr = (char *)&fixed;;
   fixed.control = TRANS_REQUEST;
-  recv_data((int)acceptfd, ptr+1, size);
+  recv_data_buf((int)acceptfd, readbuffer, ptr+1, size);
 
   /* Read list of mids */
   int mcount = fixed.mcount;
   size = mcount * sizeof(unsigned int);
   unsigned int listmid[mcount];
   ptr = (char *) listmid;
-  recv_data((int)acceptfd, ptr, size);
+  recv_data_buf((int)acceptfd, readbuffer, ptr, size);
 
   /* Read oid and version tuples for those objects that are not modified in the transaction */
   int numread = fixed.numread;
@@ -292,7 +313,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   char objread[size];
   if(numread != 0) { //If pile contains more than one object to be read,
     // keep reading all objects
-    recv_data((int)acceptfd, objread, size);
+    recv_data_buf((int)acceptfd, readbuffer, objread, size);
   }
 
   /* Read modified objects */
@@ -302,7 +323,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
       return 1;
     }
     size = fixed.sum_bytes;
-    recv_data((int)acceptfd, modptr, size);
+    recv_data_buf((int)acceptfd, readbuffer, modptr, size);
   }
 
   /* Create an array of oids for modified objects */
@@ -322,7 +343,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   }
 
   /*Process the information read */
-  if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
+  if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd, readbuffer)) != 0) {
     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
     /* Free resources */
     if(oidmod != NULL) {
@@ -343,7 +364,8 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
  * function and sends a reply to the co-ordinator.
  * Following this it also receives a new control message from the co-ordinator and processes this message*/
 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
-                     unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
+                     unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd, struct readstruct *readbuffer) {
+
   char control, sendctrl, retval;
   objheader_t *tmp_header;
   void *header;
@@ -354,7 +376,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
     return 1;
   }
-  recv_data((int)acceptfd, &control, sizeof(char));
+
+  recv_data_buf((int)acceptfd, readbuffer, &control, sizeof(char));
   /* Process the new control message */
   switch(control) {
   case TRANS_ABORT:
@@ -402,6 +425,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     //TODO Use fixed.trans_id  TID since Client may have died
     break;
   }
+
   /* Free memory */
   if (transinfo->objlocked != NULL) {
     free(transinfo->objlocked);
@@ -409,6 +433,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
   if (transinfo->objnotfound != NULL) {
     free(transinfo->objnotfound);
   }
+
   return 0;
 }
 
@@ -446,8 +471,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
     } else {  //Objs modified
       if(i == fixed->numread) {
-       oidlocked[objlocked] = -1;
-       objlocked++;
+       oidlocked[objlocked++] = -1;
       }
       int tmpsize;
       headptr = (objheader_t *) ptr;
@@ -542,10 +566,10 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
        *numBytes += size;
        /* Send TRANS_DISAGREE to Coordinator */
        *control = TRANS_DISAGREE;
+       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
       //Keep track of oid locked
-      oidlocked[*objlocked] = OID(((objheader_t *)mobj));
-      (*objlocked)++;
+      oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
     } else {  //we are locked
       if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
        (*v_matchlock)++;
@@ -558,6 +582,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
        size += sizeof(objheader_t);
        *numBytes += size;
        *control = TRANS_DISAGREE;
+       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
     }
   }
@@ -580,18 +605,17 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
        (*v_matchnolock)++;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
-       oidvernotmatch[*objvernotmatch] = oid;
-       (*objvernotmatch)++;
+       oidvernotmatch[(*objvernotmatch)++] = oid;
        int size;
        GETSIZE(size, mobj);
        size += sizeof(objheader_t);
        *numBytes += size;
        /* Send TRANS_DISAGREE to Coordinator */
        *control = TRANS_DISAGREE;
+       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
       //Keep track of oid locked
-      oidlocked[*objlocked] = OID(((objheader_t *)mobj));
-      (*objlocked)++;
+      oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
     } else { /* Some other transaction has aquired a write lock on this object */
       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
        (*v_matchlock)++;
@@ -604,6 +628,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
        size += sizeof(objheader_t);
        *numBytes += size;
        *control = TRANS_DISAGREE;
+       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
       }
     }
   }
@@ -647,7 +672,6 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
   transinfo->modptr = modptr;
   transinfo->numlocked = *(objlocked);
   transinfo->numnotfound = *(objnotfound);
-
   return control;
 }
 
@@ -668,7 +692,15 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
       return 1;
     }
     GETSIZE(tmpsize,header);
-    memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
+
+    {
+      struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+      struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
+      dst->type=src->type;
+      dst->___cachedCode___=src->___cachedCode___;
+      dst->___cachedHash___=src->___cachedHash___;
+      memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+    }
     header->version += 1;
     /* If threads are waiting on this object to be updated, notify them */
     if(header->notifylist != NULL) {
@@ -707,7 +739,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
  * If objects are not found then record those and if objects are found
  * then use offset values to prefetch references to other objects */
 
-int prefetchReq(int acceptfd) {
+int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
   int i, size, objsize, numoffset = 0;
   int length;
   char *recvbuffer, control;
@@ -715,12 +747,11 @@ int prefetchReq(int acceptfd) {
   objheader_t *header;
   oidmidpair_t oidmid;
   int sd = -1;
-
   while(1) {
-    recv_data((int)acceptfd, &numoffset, sizeof(int));
+    recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
     if(numoffset == -1)
       break;
-    recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+    recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
     oid = oidmid.oid;
     if (mid != oidmid.mid) {
       if (mid!=-1) {
@@ -730,23 +761,24 @@ int prefetchReq(int acceptfd) {
       sd = getSockWithLock(transPResponseSocketPool, mid);
     }
     short offsetarry[numoffset];
-    recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
+    recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
 
     /*Process each oid */
     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
       /* Save the oids not found in buffer for later use */
       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-      char sendbuffer[size];
-      *((int *) sendbuffer) = size;
-      *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
-      *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-      control = TRANS_PREFETCH_RESPONSE;
-      sendPrefetchResponse(sd, &control, sendbuffer, &size);
+      char sendbuffer[size+1];
+      sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+      *((int *) (sendbuffer+sizeof(char))) = size;
+      *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+      *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+      send_data(sd, sendbuffer, size+1);
     } else { /* Object Found */
-      int incr = 0;
+      int incr = 1;
       GETSIZE(objsize, header);
       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-      char sendbuffer[size];
+      char sendbuffer[size+1];
+      sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
       *((int *)(sendbuffer + incr)) = size;
       incr += sizeof(int);
       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
@@ -754,14 +786,12 @@ int prefetchReq(int acceptfd) {
       *((unsigned int *)(sendbuffer+incr)) = oid;
       incr += sizeof(unsigned int);
       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
-      control = TRANS_PREFETCH_RESPONSE;
-      sendPrefetchResponse(sd, &control, sendbuffer, &size);
+      send_data(sd, sendbuffer, size+1);
 
       /* Calculate the oid corresponding to the offset value */
       for(i = 0 ; i< numoffset ; i++) {
        /* Check for arrays  */
-       if(TYPE(header) > NUMCLASSES) {
+       if(TYPE(header) >= NUMCLASSES) {
          int elementsize = classsize[TYPE(header)];
          struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
          unsigned short length = ao->___length___;
@@ -780,19 +810,20 @@ int prefetchReq(int acceptfd) {
 
        if((header = mhashSearch(oid)) == NULL) {
          size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
-         char sendbuffer[size];
-         *((int *) sendbuffer) = size;
-         *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
-         *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+         char sendbuffer[size+1];
+         sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+         *((int *) (sendbuffer+1)) = size;
+         *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+         *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
 
-         control = TRANS_PREFETCH_RESPONSE;
-         sendPrefetchResponse(sd, &control, sendbuffer, &size);
+         send_data(sd, sendbuffer, size+1);
          break;
        } else { /* Obj Found */
-         int incr = 0;
+         int incr = 1;
          GETSIZE(objsize, header);
          size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
-         char sendbuffer[size];
+         char sendbuffer[size+1];
+         sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
          *((int *)(sendbuffer + incr)) = size;
          incr += sizeof(int);
          *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
@@ -800,17 +831,14 @@ int prefetchReq(int acceptfd) {
          *((unsigned int *)(sendbuffer+incr)) = oid;
          incr += sizeof(unsigned int);
          memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
-         control = TRANS_PREFETCH_RESPONSE;
-         sendPrefetchResponse(sd, &control, sendbuffer, &size);
+         send_data(sd, sendbuffer, size+1);
        }
-      }
+      } //end of for
     }
-  }
-  //Release socket
+  } //end of while
+    //Release socket
   if (mid!=-1)
     freeSockWithLock(transPResponseSocketPool, mid, sd);
-
   return 0;
 }