completed and tested socket reuse code for trans read and trans prefetch() messages
authoradash <adash>
Wed, 26 Mar 2008 18:18:02 +0000 (18:18 +0000)
committeradash <adash>
Wed, 26 Mar 2008 18:18:02 +0000 (18:18 +0000)
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 672a955c6644129f2aa68786ba907b1be50f7cf9..08baee501f00b67556f1d2096592b38641afbbff 100644 (file)
@@ -150,27 +150,30 @@ void *dstmAccept(void *acceptfd)
 
        switch(control) {
                case READ_REQUEST:
-                       /* Read oid requested and search if available */
-                       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
-                       if((srcObj = mhashSearch(oid)) == NULL) {
-                               printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-                               pthread_exit(NULL);
-                       }
-                       h = (objheader_t *) srcObj;
-                       GETSIZE(size, h);
-                       size += sizeof(objheader_t);
-                       sockid = (int) acceptfd;
-
-                       if (h == NULL) {
-                               ctrl = OBJECT_NOT_FOUND;
-                               send_data(sockid, &ctrl, sizeof(char));
-                       } else {
-                               /* Type */
-                               char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-                               *((int *)&msg[1])=size;
-                               send_data(sockid, &msg, sizeof(msg));
-                               send_data(sockid, h, size);
-                       }
+            do {
+                /* Read oid requested and search if available */
+                recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+                if((srcObj = mhashSearch(oid)) == NULL) {
+                    printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+                    break;
+                }
+                h = (objheader_t *) srcObj;
+                GETSIZE(size, h);
+                size += sizeof(objheader_t);
+                sockid = (int) acceptfd;
+
+                if (h == NULL) {
+                    ctrl = OBJECT_NOT_FOUND;
+                    send_data(sockid, &ctrl, sizeof(char));
+                } else {
+                    /* Type */
+                    char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+                    *((int *)&msg[1])=size;
+                    send_data(sockid, &msg, sizeof(msg));
+                    send_data(sockid, h, size);
+                }
+                               recv_data((int)acceptfd, &control, sizeof(char));
+            } while(control == READ_REQUEST);
                        break;
                
                case READ_MULT_REQUEST:
@@ -199,12 +202,13 @@ void *dstmAccept(void *acceptfd)
                        } while (control == TRANS_PREFETCH);
                        break;
                case TRANS_PREFETCH_RESPONSE:
-                       //do {
+                       do {
                                if((val = getPrefetchResponse((int) acceptfd)) != 0) {
                                        printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-                                       pthread_exit(NULL);
+                                       break;
                                }
-                       //} while (control == TRANS_PREFETCH_RESPONSE);
+                               recv_data((int)acceptfd, &control, sizeof(char));
+                       } while (control == TRANS_PREFETCH_RESPONSE);
                        break;
                case START_REMOTE_THREAD:
                        recv_data((int)acceptfd, &oid, sizeof(unsigned int));
@@ -643,10 +647,10 @@ int prefetchReq(int acceptfd) {
                        }
                        memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
                        free(recvbuffer);
-#if 0
                        pthread_mutex_lock(&sockLock);
                        sockIdFound = 0;
                        pthread_mutex_unlock(&sockLock);
+            /* If socket is already established then send data reusing socket */
                        for(i = 0; i < NUM_MACHINES; i++) {
                                if(sockArray[i].mid == mid) {
                                        sd = sockArray[i].sockid;
@@ -659,8 +663,6 @@ int prefetchReq(int acceptfd) {
 
                        if(sockIdFound == 0) {
                                if(sockCount < NUM_MACHINES) {
-
-#endif
                                        /* Create socket to send information */
                                        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
                                                perror("prefetchReq():socket()");
@@ -672,13 +674,12 @@ int prefetchReq(int acceptfd) {
                                        remoteAddr.sin_addr.s_addr = htonl(mid);
 
                                        if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+                                               perror("connect");
                                                printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
                                                                inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
                                                close(sd);
                                                return -1;
                                        }
-
-#if 0
                                        sockArray[sockCount].mid = mid;
                                        sockArray[sockCount].sockid = sd;
                                        pthread_mutex_lock(&sockLock);
@@ -690,7 +691,6 @@ int prefetchReq(int acceptfd) {
                                        return -1;
                                }
                        }
-#endif
 
                        /*Process each oid */
                        if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
@@ -812,7 +812,6 @@ int prefetchReq(int acceptfd) {
                                }
                                free(offsetarry);
                        }
-                       close(sd);
                }
        } while (length != -1);
        return 0;
index da01610c871620a89e3e8c8494bdee502697bf9a..8524cae9c58fd618e2a3fc3b928d2cb3394400ca 100644 (file)
@@ -48,13 +48,16 @@ unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
 
-/************************************************************
+/************************************************************************
  * Global variables to map socketid and remote mid to
- * reuse sockets
- ***********************************************************/
+ * reuse sockets for sending prefetches and making remote read requests
+ ************************************************************************/
 midSocketInfo_t midSocketArray[NUM_MACHINES];
 int sockCount;                 //number of connections with all remote machines(one socket per mc)
 int sockIdFound;       //track if socket file descriptor is already established 
+midSocketInfo_t sockArrayRemoteRead[NUM_MACHINES];
+int sockCountRemoteRead;               //number of connections with all remote machines(one socket per mc)
+int sockIdFoundRemoteRead;     //track if socket file descriptor is already established 
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -70,7 +73,6 @@ void send_data(int fd , void *buf, int buflen) {
                numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
                if (numbytes == -1) {
                        perror("send");
-                       printf("error: at %s, %d\n", __FILE__, __LINE__);
                        exit(-1);
                }
                buffer += numbytes;
@@ -86,7 +88,6 @@ void recv_data(int fd , void *buf, int buflen) {
                numbytes = recv(fd, buffer, size, 0);
                if (numbytes == -1) {
                        perror("recv");
-                       printf("error: at %s, %d\n", __FILE__, __LINE__);
                        exit(-1);
                }
                buffer += numbytes;
@@ -653,7 +654,6 @@ int transCommit(transrecord_t *record) {
                printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
                exit(-1);
        }
-
        return 0;
 }
 
@@ -861,24 +861,43 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        objheader_t *h;
        void *objcopy;
 
-       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket\n");
-               return NULL;
-       }
-
-       bzero((char*) &serv_addr, sizeof(serv_addr));
-       serv_addr.sin_family = AF_INET;
-       serv_addr.sin_port = htons(LISTEN_PORT);
-       midtoIP(mnum,machineip);
-       machineip[15] = '\0';
-       serv_addr.sin_addr.s_addr = inet_addr(machineip);
-
-       // Open connection 
-       if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("getRemoteObj() Error in connect\n");
-               return NULL;
-       }
-
+    int i;
+    for(i = 0; i < NUM_MACHINES; i++) {
+        if(sockArrayRemoteRead[i].mid == mnum) {
+            sd = sockArrayRemoteRead[i].sockid;
+            sockIdFoundRemoteRead = 1;
+            break;
+        }
+    }
+    
+    if(sockIdFoundRemoteRead == 0) {
+        if(sockCountRemoteRead < NUM_MACHINES) {
+            /* Create socket */
+            if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+                perror("Error in socket\n");
+                return NULL;
+            }
+
+            bzero((char*) &serv_addr, sizeof(serv_addr));
+            serv_addr.sin_family = AF_INET;
+            serv_addr.sin_port = htons(LISTEN_PORT);
+            serv_addr.sin_addr.s_addr = htonl(mnum);
+            // Open connection 
+            if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+                perror("getRemoteObj() Error in connect\n");
+                close(sd);
+                return NULL;
+            }
+            sockArrayRemoteRead[sockCountRemoteRead].mid = mnum;
+            sockArrayRemoteRead[sockCountRemoteRead].sockid = sd;
+            sockCountRemoteRead++;
+        } else {
+            //TODO Fix for connecting to more than 2 machines && close socket
+            printf("%s(): Error: Currently works for two remote machines\n", __func__);
+            return NULL;
+        }
+    }
+    
        char readrequest[sizeof(char)+sizeof(unsigned int)];
        readrequest[0] = READ_REQUEST;
        *((unsigned int *)(&readrequest[1])) = oid;
@@ -904,8 +923,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                        return NULL;
        }
 
-       //Close connection 
-       close(sd);
        return objcopy;
 }