small changes
authorbdemsky <bdemsky>
Thu, 27 Mar 2008 07:51:54 +0000 (07:51 +0000)
committerbdemsky <bdemsky>
Thu, 27 Mar 2008 07:51:54 +0000 (07:51 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 6c893cfc3f7f0a48a1bce12420b514e110667025..f7478881487e7feea66a4704faa909049cf9e669 100644 (file)
@@ -42,6 +42,7 @@
 #define THREAD_NOTIFY_REQUEST          24
 #define THREAD_NOTIFY_RESPONSE         25
 #define TRANS_UNSUCESSFUL              26
+#define CLOSE_CONNECTION                27
 
 //Max number of objects 
 #define MAX_OBJECTS  20
@@ -200,6 +201,7 @@ typedef struct midSocketInfo {
 int dstmInit(void);
 void send_data(int fd, void *buf, int buflen);
 void recv_data(int fd, void *buf, int buflen);
+int recv_data_errorcode(int fd, void *buf, int buflen);
 
 /* Prototypes for object header */
 unsigned int getNewOID(void);
index 08baee501f00b67556f1d2096592b38641afbbff..64b1c8bc0d45f06226d2c0a6dd758662da561850 100644 (file)
@@ -126,148 +126,145 @@ void *dstmListen()
 }
 /* This function accepts a new connection request, decodes the control message in the connection 
  * and accordingly calls other functions to process new requests */
-void *dstmAccept(void *acceptfd)
-{
-       int val, retval, size, sum, sockid;
-       unsigned int oid;
-       char *buffer;
-       char control,ctrl;
-       char *ptr;
-       void *srcObj;
-       objheader_t *h;
-       trans_commit_data_t transinfo;
-       unsigned short objType, *versionarry, version;
-       unsigned int *oidarry, numoid, mid, threadid;
-       
-       transinfo.objlocked = NULL;
-       transinfo.objnotfound = NULL;
-       transinfo.modptr = NULL;
-       transinfo.numlocked = 0;
-       transinfo.numnotfound = 0;
-
-       /* Receive control messages from other machines */
-       recv_data((int)acceptfd, &control, sizeof(char));
-
-       switch(control) {
-               case READ_REQUEST:
-            do {
-                /* Read oid requested and search if available */
-                recv_data((int)acceptfd, &oid, sizeof(unsigned int));
-                if((srcObj = mhashSearch(oid)) == NULL) {
-                    printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-                    break;
-                }
-                h = (objheader_t *) srcObj;
-                GETSIZE(size, h);
-                size += sizeof(objheader_t);
-                sockid = (int) acceptfd;
-
-                if (h == NULL) {
-                    ctrl = OBJECT_NOT_FOUND;
-                    send_data(sockid, &ctrl, sizeof(char));
-                } else {
-                    /* Type */
-                    char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-                    *((int *)&msg[1])=size;
-                    send_data(sockid, &msg, sizeof(msg));
-                    send_data(sockid, h, size);
-                }
-                               recv_data((int)acceptfd, &control, sizeof(char));
-            } while(control == READ_REQUEST);
-                       break;
-               
-               case READ_MULT_REQUEST:
-                       break;
-       
-               case MOVE_REQUEST:
-                       break;
-
-               case MOVE_MULT_REQUEST:
-                       break;
-
-               case TRANS_REQUEST:
-                       /* Read transaction request */
-                       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
-                               printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
-                               pthread_exit(NULL);
-                       }
-                       break;
-               case TRANS_PREFETCH:
-                       do {
-                               if((val = prefetchReq((int)acceptfd)) != 0) {
-                                       printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-                                       break;
-                               }
-                               recv_data((int)acceptfd, &control, sizeof(char));
-                       } while (control == TRANS_PREFETCH);
-                       break;
-               case TRANS_PREFETCH_RESPONSE:
-                       do {
-                               if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-                                       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-                                       break;
-                               }
-                               recv_data((int)acceptfd, &control, sizeof(char));
-                       } while (control == TRANS_PREFETCH_RESPONSE);
-                       break;
-               case START_REMOTE_THREAD:
-                       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
-                       objType = getObjType(oid);
-                       startDSMthread(oid, objType);
-                       break;
-
-               case THREAD_NOTIFY_REQUEST:
-                       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
-                       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
-                       if((buffer = calloc(1,size)) == NULL) {
-                               printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-                               pthread_exit(NULL);
-                       }
-
-                       recv_data((int)acceptfd, buffer, size);
-
-                       oidarry = calloc(numoid, sizeof(unsigned int)); 
-                       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
-                       size = sizeof(unsigned int) * numoid;
-                       versionarry = calloc(numoid, sizeof(unsigned short));
-                       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
-                       size += sizeof(unsigned short) * numoid;
-                       mid = *((unsigned int *)(buffer+size));
-                       size += sizeof(unsigned int);
-                       threadid = *((unsigned int *)(buffer+size));
-                       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
-                       free(buffer);
-
-                       break;
-
-               case THREAD_NOTIFY_RESPONSE:
-                       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
-                       if((buffer = calloc(1,size)) == NULL) {
-                               printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-                               pthread_exit(NULL);
-                       }
-
-                       recv_data((int)acceptfd, buffer, size);
-
-                       oid = *((unsigned int *)buffer);
-                       size = sizeof(unsigned int);
-                       version = *((unsigned short *)(buffer+size));
-                       size += sizeof(unsigned short);
-                       threadid = *((unsigned int *)(buffer+size));
-                       threadNotify(oid,version,threadid);
-                       free(buffer);
-
-                       break;
-               default:
-                       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
-       }
-
-       /* Close connection */
-       if (close((int)acceptfd) == -1)
-               perror("close");
+void *dstmAccept(void *acceptfd) {
+  int val, retval, size, sum, sockid;
+  unsigned int oid;
+  char *buffer;
+  char control,ctrl;
+  char *ptr;
+  void *srcObj;
+  objheader_t *h;
+  trans_commit_data_t transinfo;
+  unsigned short objType, *versionarry, version;
+  unsigned int *oidarry, numoid, mid, threadid;
+  
+  transinfo.objlocked = NULL;
+  transinfo.objnotfound = NULL;
+  transinfo.modptr = NULL;
+  transinfo.numlocked = 0;
+  transinfo.numnotfound = 0;
+  
+  /* Receive control messages from other machines */
+  while(true) {
+    int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
+    if (ret==-1)
+      break;
+    switch(control) {
+    case READ_REQUEST:
+      /* Read oid requested and search if available */
+      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+      if((srcObj = mhashSearch(oid)) == NULL) {
+       printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+       break;
+      }
+      h = (objheader_t *) srcObj;
+      GETSIZE(size, h);
+      size += sizeof(objheader_t);
+      sockid = (int) acceptfd;
+      
+      if (h == NULL) {
+       ctrl = OBJECT_NOT_FOUND;
+       send_data(sockid, &ctrl, sizeof(char));
+      } else {
+       /* Type */
+       char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+       *((int *)&msg[1])=size;
+       send_data(sockid, &msg, sizeof(msg));
+       send_data(sockid, h, size);
+      }
+      break;
+      
+    case READ_MULT_REQUEST:
+      break;
+      
+    case MOVE_REQUEST:
+      break;
+      
+    case MOVE_MULT_REQUEST:
+      break;
+      
+    case TRANS_REQUEST:
+      /* Read transaction request */
+      if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
+       printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+       pthread_exit(NULL);
+      }
+      break;
+    case TRANS_PREFETCH:
+      if((val = prefetchReq((int)acceptfd)) != 0) {
+       printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
+      break;
+    case TRANS_PREFETCH_RESPONSE:
+      if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+       break;
+      }
+      break;
+    case START_REMOTE_THREAD:
+      recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+      objType = getObjType(oid);
+      startDSMthread(oid, objType);
+      break;
+      
+    case THREAD_NOTIFY_REQUEST:
+      recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+      size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
+      if((buffer = calloc(1,size)) == NULL) {
+       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
        pthread_exit(NULL);
+      }
+      
+      recv_data((int)acceptfd, buffer, size);
+      
+      oidarry = calloc(numoid, sizeof(unsigned int)); 
+      memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
+      size = sizeof(unsigned int) * numoid;
+      versionarry = calloc(numoid, sizeof(unsigned short));
+      memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
+      size += sizeof(unsigned short) * numoid;
+      mid = *((unsigned int *)(buffer+size));
+      size += sizeof(unsigned int);
+      threadid = *((unsigned int *)(buffer+size));
+      processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+      free(buffer);
+      
+      break;
+
+    case THREAD_NOTIFY_RESPONSE:
+      size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
+      if((buffer = calloc(1,size)) == NULL) {
+       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+       pthread_exit(NULL);
+      }
+      
+      recv_data((int)acceptfd, buffer, size);
+      
+      oid = *((unsigned int *)buffer);
+      size = sizeof(unsigned int);
+      version = *((unsigned short *)(buffer+size));
+      size += sizeof(unsigned short);
+      threadid = *((unsigned int *)(buffer+size));
+      threadNotify(oid,version,threadid);
+      free(buffer);
+      break;
+
+    case CLOSE_CONNECTION:
+      goto closeconnection;
+
+    default:
+      printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
+    }
+  }
+
+ closeconnection:
+  /* Close connection */
+  if (close((int)acceptfd) == -1)
+    perror("close");
+  pthread_exit(NULL);
 }
-
+  
 /* This function reads the information available in a transaction request
  * and makes a function call to process the request */
 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
index 8524cae9c58fd618e2a3fc3b928d2cb3394400ca..66e524baed591614eef369ca87a9912efb429cf2 100644 (file)
@@ -95,6 +95,21 @@ void recv_data(int fd , void *buf, int buflen) {
        }
 }
 
+int recv_data_errorcode(int fd , void *buf, int buflen) {
+  char *buffer = (char *)(buf); 
+  int size = buflen;
+  int numbytes; 
+  while (size > 0) {
+    numbytes = recv(fd, buffer, size, 0);
+    if (numbytes == -1) {
+      return -1;
+    }
+    buffer += numbytes;
+    size -= numbytes;
+  }
+  return 0;
+}
+
 void printhex(unsigned char *ptr, int numBytes)
 {
        int i;