Added send_data() and recv_data() methods for send() and recv()
authoradash <adash>
Fri, 21 Mar 2008 21:20:16 +0000 (21:20 +0000)
committeradash <adash>
Fri, 21 Mar 2008 21:20:16 +0000 (21:20 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 09684f9c7e9c541df7efba38728d59626e4070cc..6c893cfc3f7f0a48a1bce12420b514e110667025 100644 (file)
@@ -5,13 +5,16 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+/***********************************************************
+ *       Macros
+ **********************************************************/
 #define GET_NTUPLES(x)         ((int *)(x + sizeof(prefetchqelem_t)))
 #define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
 #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
 #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
-
-
-//Coordinator Messages
+/*****************************************
+ *  Coordinator Messages
+ ***************************************/
 #define READ_REQUEST           1
 #define READ_MULT_REQUEST      2
 #define MOVE_REQUEST           3
@@ -22,7 +25,9 @@
 #define TRANS_PREFETCH         8
 #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING   9
 
-//Participant Messages
+/*********************************
+ * Participant Messages
+ *******************************/
 #define OBJECT_FOUND                   10
 #define OBJECT_NOT_FOUND               11
 #define OBJECTS_FOUND                  12
 #define THREAD_NOTIFY_RESPONSE         25
 #define TRANS_UNSUCESSFUL              26
 
-//Control bits for status of objects in Machine pile
-#define OBJ_LOCKED_BUT_VERSION_MATCH   14
-#define OBJ_UNLOCK_BUT_VERSION_MATCH   15
-#define VERSION_NO_MATCH               16
-
 //Max number of objects 
 #define MAX_OBJECTS  20
+//Max remote-machine connections
+#define NUM_MACHINES 2
+#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
+//Transaction id per machine
+#define TID_LEN 20
 
 
 #include <stdlib.h>
@@ -57,8 +62,6 @@
 #include "threadnotify.h"
 
 
-#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
-#define TID_LEN 20
 //bit designations for status field of objheader
 #define DIRTY 0x01
 #define NEW   0x02
@@ -127,6 +130,7 @@ typedef struct transrecord {
   struct ___Object___ * revertlist;
 #endif
 } transrecord_t;
+
 // Structure is a shared structure that keeps track of responses from the participants
 typedef struct thread_response {
   char rcv_status;
@@ -188,12 +192,14 @@ typedef struct local_thread_data_array {
 
 //Structure to store mid and socketid information
 typedef struct midSocketInfo {
-       unsigned int mid;
+       unsigned int mid;               /* To communicate with mid use sockid in this data structure*/
        int sockid;
 } midSocketInfo_t;
 
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
+void send_data(int fd, void *buf, int buflen);
+void recv_data(int fd, void *buf, int buflen);
 
 /* Prototypes for object header */
 unsigned int getNewOID(void);
@@ -229,13 +235,13 @@ void mapObjMethod(unsigned short);
 void randomdelay();
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *, unsigned int);
-objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
+objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header
 int transCommit(transrecord_t *record); //return 0 if successful
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
 void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
-void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-void *handleLocalReq(void *);
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);// returns object header from main object store after object is copied into it from remote machine
+void *handleLocalReq(void *);//handles Local requests 
 int transComProcess(local_thread_data_array_t *);
 int transAbortProcess(local_thread_data_array_t *);
 void transAbort(transrecord_t *trans);
@@ -244,8 +250,8 @@ void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
 void checkPrefetchTuples(prefetchqelem_t *);
-prefetchpile_t *foundLocal(prefetchqelem_t *);
-prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
+prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
+prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets)
 void checkPreCache(prefetchqelem_t *, int *, unsigned int, int);
 int transPrefetchProcess(transrecord_t *, int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
index e76ab19fb05636a927db6b4fd5317ded0ffe30cc..672a955c6644129f2aa68786ba907b1be50f7cf9 100644 (file)
@@ -27,6 +27,14 @@ extern int classsize[];
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
+/**********************************************************
+ * Global variables to map socketid and remote mid
+ * to  resuse sockets
+ **************************************************/
+midSocketInfo_t sockArray[NUM_MACHINES];
+int sockCount; //number of connections with all remote machines(one socket per mc)
+int sockIdFound; //track if socket file descriptor is already established
+pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent
 
 /* This function initializes the main objects store and creates the 
  * global machine and location lookup table */
@@ -47,6 +55,14 @@ int dstmInit(void)
        if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
                return 1; //failure
        
+       //Initialize mid to socketid mapping array
+       int t;
+       sockCount = 0;
+       for(t = 0; t < NUM_MACHINES; t++) {
+               sockArray[t].mid = 0;
+               sockArray[t].sockid = 0;
+       }
+
        return 0;
 }
 
@@ -112,7 +128,7 @@ void *dstmListen()
  * and accordingly calls other functions to process new requests */
 void *dstmAccept(void *acceptfd)
 {
-       int val, retval, size, sum;
+       int val, retval, size, sum, sockid;
        unsigned int oid;
        char *buffer;
        char control,ctrl;
@@ -130,18 +146,12 @@ void *dstmAccept(void *acceptfd)
        transinfo.numnotfound = 0;
 
        /* Receive control messages from other machines */
-       if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
-               printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
-               pthread_exit(NULL);
-       }
-       
+       recv_data((int)acceptfd, &control, sizeof(char));
+
        switch(control) {
                case READ_REQUEST:
                        /* Read oid requested and search if available */
-                       if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
-                               perror("Error: receiving 0x0 object from cooridnator\n");
-                               pthread_exit(NULL);
-                       }
+                       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
                        if((srcObj = mhashSearch(oid)) == NULL) {
                                printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
                                pthread_exit(NULL);
@@ -149,25 +159,17 @@ void *dstmAccept(void *acceptfd)
                        h = (objheader_t *) srcObj;
                        GETSIZE(size, h);
                        size += sizeof(objheader_t);
+                       sockid = (int) acceptfd;
 
                        if (h == NULL) {
                                ctrl = OBJECT_NOT_FOUND;
-                               if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-                                       perror("Error sending control msg to coordinator\n");
-                                       pthread_exit(NULL);
-                               }
+                               send_data(sockid, &ctrl, sizeof(char));
                        } else {
                                /* Type */
                                char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
                                *((int *)&msg[1])=size;
-                               if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
-                                       perror("Error sending size of object to coordinator\n");
-                                       pthread_exit(NULL);
-                               }
-                               if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
-                                       perror("Error in sending object\n");
-                                       pthread_exit(NULL);
-                               }
+                               send_data(sockid, &msg, sizeof(msg));
+                               send_data(sockid, h, size);
                        }
                        break;
                
@@ -191,51 +193,34 @@ void *dstmAccept(void *acceptfd)
                        do {
                                if((val = prefetchReq((int)acceptfd)) != 0) {
                                        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-                                       pthread_exit(NULL);
-                               }
-
-                               if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) < 0) {
-                                       printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
-                                       pthread_exit(NULL);
-                               } else if(retval == 0) {
-                                       printf("%s() Error: socket closed at the requesting side\n");
-                                       pthread_exit(NULL);
+                                       break;
                                }
-
+                               recv_data((int)acceptfd, &control, sizeof(char));
                        } while (control == TRANS_PREFETCH);
-
                        break;
                case TRANS_PREFETCH_RESPONSE:
-                       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-                               printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-                               pthread_exit(NULL);
-                       }
+                       //do {
+                               if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+                                       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+                                       pthread_exit(NULL);
+                               }
+                       //} while (control == TRANS_PREFETCH_RESPONSE);
                        break;
                case START_REMOTE_THREAD:
-                       retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
-                       if (retval <= 0)
-                               perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
-                       else if (retval != sizeof(unsigned int))
-                               printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
-                                       retval, __FILE__, __LINE__);
-                       else
-                       {
-                               objType = getObjType(oid);
-                               startDSMthread(oid, objType);
-                       }
+                       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+                       objType = getObjType(oid);
+                       startDSMthread(oid, objType);
                        break;
 
                case THREAD_NOTIFY_REQUEST:
-                       retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0);
+                       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
                        size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
                        if((buffer = calloc(1,size)) == NULL) {
                                printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
                                pthread_exit(NULL);
                        }
-                       sum = 0;
-                       do {
-                               sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
-                       } while(sum < size);
+
+                       recv_data((int)acceptfd, buffer, size);
 
                        oidarry = calloc(numoid, sizeof(unsigned int)); 
                        memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
@@ -258,10 +243,7 @@ void *dstmAccept(void *acceptfd)
                                pthread_exit(NULL);
                        }
 
-                       sum = 0;
-                       do {
-                               sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
-                       } while(sum < size);
+                       recv_data((int)acceptfd, buffer, size);
 
                        oid = *((unsigned int *)buffer);
                        size = sizeof(unsigned int);
@@ -290,41 +272,30 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
        unsigned int *oidmod, oid;
        fixed_data_t fixed;
        objheader_t *headaddr;
-       int sum = 0, i, N, n, val;
+       int sum, i, size, n, val;
 
        oidmod = NULL;
 
        /* Read fixed_data_t data structure */ 
-       N = sizeof(fixed) - 1;
+       size = sizeof(fixed) - 1;
        ptr = (char *)&fixed;;
        fixed.control = TRANS_REQUEST;
-       do {
-               n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
-               sum += n;
-       } while(sum < N && n != 0); 
+       recv_data((int)acceptfd, ptr+1, size);
 
        /* Read list of mids */
        int mcount = fixed.mcount;
-       N = mcount * sizeof(unsigned int);
+       size = mcount * sizeof(unsigned int);
        unsigned int listmid[mcount];
        ptr = (char *) listmid;
-       sum = 0;
-       do {
-               n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
-               sum += n;
-       } while(sum < N && n != 0);
-
+       recv_data((int)acceptfd, ptr, size);
+       
        /* Read oid and version tuples for those objects that are not modified in the transaction */
        int numread = fixed.numread;
-       N = numread * (sizeof(unsigned int) + sizeof(unsigned short));
-       char objread[N];
+       size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
+       char objread[size];
        if(numread != 0) { //If pile contains more than one object to be read, 
                          // keep reading all objects
-               sum = 0;
-               do {
-                       n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0);
-                       sum += n;
-               } while(sum < N && n != 0);
+               recv_data((int)acceptfd, objread, size);        
        }
        
        /* Read modified objects */
@@ -333,11 +304,8 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
                        printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
-               sum = 0;
-               do { // Recv the objs that are modified by the Coordinator
-                       n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
-                       sum += n;
-               } while (sum < fixed.sum_bytes && n != 0);
+               size = fixed.sum_bytes;
+               recv_data((int)acceptfd, modptr, size); 
        }
 
        /* Create an array of oids for modified objects */
@@ -391,10 +359,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                return 1;
        }
 
-       do {
-               retval = recv((int)acceptfd, &control, sizeof(char), 0);
-       } while(retval < sizeof(char));
-
+       recv_data((int)acceptfd, &control, sizeof(char));
+       
        /* Process the new control message */
        switch(control) {
                case TRANS_ABORT:
@@ -411,17 +377,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 
                        /* Send ack to Coordinator */
                        sendctrl = TRANS_UNSUCESSFUL;
-                       if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-                               perror("Error: In sending ACK to coordinator\n");
-                               if (transinfo->objlocked != NULL) {
-                                       free(transinfo->objlocked);
-                               }
-                               if (transinfo->objnotfound != NULL) {
-                                       free(transinfo->objnotfound);
-                               }
-
-                               return 1;
-                       }
+                       send_data((int)acceptfd, &sendctrl, sizeof(char));
                        break;
 
                case TRANS_COMMIT:
@@ -522,10 +478,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                                }
                                                free(oidlocked);
                                        }
-                                       if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-                                               perror("Error in sending control to the Coordinator\n");
-                                               return 0;
-                                       }
+                                       send_data(acceptfd, &control, sizeof(char));
                                        return control;
                                }
                        } else {/* If Obj is not locked then lock object */
@@ -550,11 +503,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                                        }
 
                                        /* Send TRANS_DISAGREE to Coordinator */
-                                       if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-                                               perror("Error in sending control to the Coordinator\n");
-                                               return 0;
-                                       }
-                                       
+                                       send_data(acceptfd, &control, sizeof(char));
                                        return control;
                                }
                        }
@@ -583,34 +532,22 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
        if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
                control = TRANS_AGREE;
                /* Send control message */
-               if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error in sending control to Coordinator\n");
-                       return 0;
-               }
+               send_data(acceptfd, &control, sizeof(char));
        }
        /* Condition to send TRANS_SOFT_ABORT */
        if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
                control = TRANS_SOFT_ABORT;
 
                /* Send control message */
-               if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error in sending TRANS_SOFT_ABORT control\n");
-                       return 0;
-               }
-
+               send_data(acceptfd, &control, sizeof(char));
+       
                /* Send number of oids not found and the missing oids if objects are missing in the machine */
                if(*(objnotfound) != 0) { 
                        int msg[1];
                        msg[0] = *(objnotfound);
-                       if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
-                               perror("Error in sending objects that are not found\n");
-                               return 0;
-                       }
+                       send_data(acceptfd, &msg, sizeof(int));
                        int size = sizeof(unsigned int)* *(objnotfound);
-                       if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
-                               perror("Error in sending objects that are not found\n");
-                               return 0;
-                       }
+                       send_data(acceptfd, oidnotfound, size);
                }
        }
 
@@ -668,11 +605,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
 
        /* Send ack to coordinator */
        control = TRANS_SUCESSFUL;
-       if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-               perror("Error sending ACK to coordinator\n");
-               return 1;
-       }
-       
+       send_data((int)acceptfd, &control, sizeof(char));
        return 0;
 }
 
@@ -683,27 +616,22 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
 
 int prefetchReq(int acceptfd) {
        int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
-       int length, sd;
+       int length, sd = -1;
        char *recvbuffer, *sendbuffer, control;
        unsigned int oid, mid;
        short *offsetarry;
        objheader_t *header;
        struct sockaddr_in remoteAddr;
 
-       while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
-               if(length == -1) { //-1 is special character to represent end of sending oids and offsets
-                       break;
-               } else {
-                       numbytes = 0;
+       do {
+               recv_data((int)acceptfd, &length, sizeof(int));
+               if(length != -1) {
                        size = length - sizeof(int);
                        if((recvbuffer = calloc(1, size)) == NULL) {
                                printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
                                return -1;
                        }
-                       while(numbytes < size) {
-                               numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
-                       }
-                       
+                       recv_data((int)acceptfd, recvbuffer, size);
                        oid = *((unsigned int *) recvbuffer);
                        mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
                        size = size - (2 * sizeof(unsigned int));
@@ -715,23 +643,54 @@ int prefetchReq(int acceptfd) {
                        }
                        memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
                        free(recvbuffer);
-
-                       /* Create socket to send information */
-                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-                               perror("prefetchReq():socket()");
-                               return -1;
+#if 0
+                       pthread_mutex_lock(&sockLock);
+                       sockIdFound = 0;
+                       pthread_mutex_unlock(&sockLock);
+                       for(i = 0; i < NUM_MACHINES; i++) {
+                               if(sockArray[i].mid == mid) {
+                                       sd = sockArray[i].sockid;
+                                       pthread_mutex_lock(&sockLock);
+                                       sockIdFound = 1;
+                                       pthread_mutex_unlock(&sockLock);
+                                       break;
+                               }
                        }
-                       bzero(&remoteAddr, sizeof(remoteAddr));
-                       remoteAddr.sin_family = AF_INET;
-                       remoteAddr.sin_port = htons(LISTEN_PORT);
-                       remoteAddr.sin_addr.s_addr = htonl(mid);
-
-                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-                               printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
-                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                               close(sd);
-                               return -1;
+
+                       if(sockIdFound == 0) {
+                               if(sockCount < NUM_MACHINES) {
+
+#endif
+                                       /* Create socket to send information */
+                                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+                                               perror("prefetchReq():socket()");
+                                               return -1;
+                                       }
+                                       bzero(&remoteAddr, sizeof(remoteAddr));
+                                       remoteAddr.sin_family = AF_INET;
+                                       remoteAddr.sin_port = htons(LISTEN_PORT);
+                                       remoteAddr.sin_addr.s_addr = htonl(mid);
+
+                                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+                                               printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+                                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+                                               close(sd);
+                                               return -1;
+                                       }
+
+#if 0
+                                       sockArray[sockCount].mid = mid;
+                                       sockArray[sockCount].sockid = sd;
+                                       pthread_mutex_lock(&sockLock);
+                                       sockCount++;
+                                       pthread_mutex_unlock(&sockLock);
+                               } else {
+                                       //TODO Fix for connecting to more than 2 machines && close socket
+                                       printf("%s(): Error: Currently works for only 2 machines\n", __func__);
+                                       return -1;
+                               }
                        }
+#endif
 
                        /*Process each oid */
                        if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
@@ -781,6 +740,7 @@ int prefetchReq(int acceptfd) {
                                        close(sd);
                                        return -1;
                                }
+
                                /* Calculate the oid corresponding to the offset value */
                                for(i = 0 ; i< numoffset ; i++) {
                                        /* Check for arrays  */
@@ -852,29 +812,19 @@ int prefetchReq(int acceptfd) {
                                }
                                free(offsetarry);
                        }
+                       close(sd);
                }
-       }
-       close(sd);
+       } while (length != -1);
        return 0;
 }
 
 int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
        int numbytes = 0;
 
-       if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-               printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
-               free(sendbuffer);
-               return -1;
-       }
-
+       send_data(sd, control, sizeof(char));
        /* Send the buffer with its size */
-       if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) {
-               printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n",
-                               __func__, __FILE__, __LINE__, numbytes, *(size));
-               free(sendbuffer);
-               return -1;
-       }
-
+       int length = *(size);
+       send_data(sd, sendbuffer, length);
        free(sendbuffer);
        return 0;
 }
@@ -933,21 +883,8 @@ checkversion:
                                                *((unsigned short *)(&msg[1]+size)) = newversion;
                                                size += sizeof(unsigned short);
                                                *((unsigned int *)(&msg[1]+size)) = threadid;
-                                               bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
-                                               if (bytesSent < 0){
-                                                       perror("processReqNotify():send()");
-                                                       close(sd);
-                                                       return;
-                                               } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
-                                                       printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", 
-                                                                       bytesSent, __FILE__, __LINE__);
-                                                       close(sd);
-                                                       return;
-                                               } else {
-                                                       close(sd);
-                                                       return;
-                                               }
-
+                                               size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+                                               send_data(sd, msg, size);
                                        }
                                        close(sd);
                                }
@@ -961,4 +898,3 @@ checkversion:
        free(oidarry);
        free(versionarry);
 }
-
index e9d2d90b693843941dd682d8460ad782ee9b3181..485e7121cec61d8b9a03eae7cdea89b767a0bb04 100644 (file)
@@ -25,7 +25,6 @@
 #define LISTEN_PORT 2156
 #define NUM_THREADS 1
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
-#define NUM_MACHINES 2
 #define CONFIG_FILENAME "dstm.conf"
 
 /* Global Variables */
@@ -49,14 +48,52 @@ unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
 
-/* Global variables to track mapping of socketid and remote mid */
+/************************************************************
+ * Global variables to map socketid and remote mid to
+ * reuse sockets
+ ***********************************************************/
 midSocketInfo_t midSocketArray[NUM_MACHINES];
-int sockCount = 0;
-int sockIdFound;
+int sockCount;                 //number of connections with all remote machines(one socket per mc)
+int sockIdFound;       //track if socket file descriptor is already established 
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
 
+/*******************************
+ * Send and Recv function calls 
+ *******************************/
+void send_data(int fd , void *buf, int buflen) {
+       char *buffer = (char *)(buf); 
+       int size = buflen;
+       int numbytes; 
+       while (size > 0) {
+               numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
+               if (numbytes == -1) {
+                       perror("send");
+                       printf("error: at %s, %d\n", __FILE__, __LINE__);
+                       exit(-1);
+               }
+               buffer += numbytes;
+               size -= numbytes;
+       }
+}
+
+void recv_data(int fd , void *buf, int buflen) {
+       char *buffer = (char *)(buf); 
+       int size = buflen;
+       int numbytes; 
+       while (size > 0) {
+               numbytes = recv(fd, buffer, size, 0);
+               if (numbytes == -1) {
+                       perror("recv");
+                       printf("error: at %s, %d\n", __FILE__, __LINE__);
+                       exit(-1);
+               }
+               buffer += numbytes;
+               size -= numbytes;
+       }
+}
+
 void printhex(unsigned char *ptr, int numBytes)
 {
        int i;
@@ -208,6 +245,7 @@ void transInit() {
        pthread_detach(tPrefetch);
 
        //Initialize mid to socketid mapping array
+       sockCount = 0;
        for(t = 0; t < NUM_MACHINES; t++) {
                midSocketArray[t].mid = 0;
                midSocketArray[t].sockid = 0;
@@ -685,31 +723,18 @@ void *transRequest(void *threadarg) {
        }
 
        /* Send bytes of data with TRANS_REQUEST control message */
-       if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
-               perror("Error sending fixed bytes for thread\n");
-               close(sd);
-               pthread_exit(NULL);
-       }
-
+       send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
+       
        /* Send list of machines involved in the transaction */
        {
                int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
-               if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
-                       perror("Error sending list of machines for thread\n");
-                       close(sd);
-                       pthread_exit(NULL);
-               }
+               send_data(sd, tdata->buffer->listmid, size);
        }
 
        /* Send oids and version number tuples for objects that are read */
        {
                int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
-               
-               if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
-                       perror("Error sending tuples for thread\n");
-                       close(sd);
-                       pthread_exit(NULL);
-               }
+               send_data(sd, tdata->buffer->objread, size);
        }
 
        /* Send objects that are modified */
@@ -718,20 +743,11 @@ void *transRequest(void *threadarg) {
                headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
                GETSIZE(size,headeraddr);
                size+=sizeof(objheader_t);
-               if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
-                       perror("Error sending obj modified for thread\n");
-                       close(sd);
-                       pthread_exit(NULL);
-               }
+               send_data(sd, headeraddr, size);
        }
 
        /* Read control message from Participant */
-       if((n = read(sd, &control, sizeof(char))) <= 0) {
-               perror("Error in reading control message from Participant\n");
-               close(sd);
-               pthread_exit(NULL);
-       }
-
+       recv_data(sd, &control, sizeof(char));
        recvcontrol = control;
 
        /* Update common data structure and increment count */
@@ -760,9 +776,7 @@ void *transRequest(void *threadarg) {
                pthread_exit(NULL);
        }
 
-       do {
-          retval = recv((int)sd, &control, sizeof(char), 0);
-       } while (retval < sizeof(char));
+       recv_data((int)sd, &control, sizeof(char)); 
 
        if(control == TRANS_UNSUCESSFUL) {
                //printf("DEBUG-> TRANS_ABORTED\n");
@@ -830,32 +844,28 @@ void decideResponse(thread_data_array_t *tdata) {
  * It returns a char that is only needed to check the correctness of execution of this function inside
  * transRequest()*/
 char sendResponse(thread_data_array_t *tdata, int sd) {
-       int n, N, sum, oidcount = 0, control;
+       int n, size, sum, oidcount = 0, control;
        char *ptr, retval = 0;
        unsigned int *oidnotfound;
 
        control = *(tdata->replyctrl);
-       if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-               perror("Error sending ctrl message for participant\n");
-               return 0;
-       }
+       send_data(sd, &control, sizeof(char));
 
-       //FIXME read missing objects 
+       //TODO read missing objects to be used during object migration
        /* If the decided response is due to a soft abort and missing objects at the Participant's side */
        /*
        if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
                // Read list of objects missing  
-               if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
-                       N = oidcount * sizeof(unsigned int);
+               recv_data(sd, &oidcount, sizeof(int));
+               //if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+               if(oidcount != 0) {
+                       size = oidcount * sizeof(unsigned int);
                        if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
                                printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                                return 0;
                        }
                        ptr = (char *) oidnotfound;
-                       do {
-                               n = read(sd, ptr+sum, N-sum);
-                               sum += n;
-                       } while(sum < N && n !=0);
+                       recv_data(sd, ptr, size);
                }
                retval =  TRANS_SOFT_ABORT;
        }
@@ -905,31 +915,20 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        char readrequest[sizeof(char)+sizeof(unsigned int)];
        readrequest[0] = READ_REQUEST;
        *((unsigned int *)(&readrequest[1])) = oid;
-       if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
-               perror("getRemoteObj(): error sending message\n");
-               return NULL;
-       }
+       send_data(sd, readrequest, sizeof(readrequest));
 
        /* Read response from the Participant */
-       if((val = read(sd, &control, sizeof(char))) <= 0) {
-               printf("getRemoteObj(): error no response, %d\n", val);
-               return NULL;
-       }
+       recv_data(sd, &control, sizeof(char));
 
        switch(control) {
                case OBJECT_NOT_FOUND:
                        return NULL;
                case OBJECT_FOUND:
                        /* Read object if found into local cache */
-                       if((val = read(sd, &size, sizeof(int))) <= 0) {
-                               perror("getRemoteObj(): error in reading size\n");
-                               return NULL;
-                       }
+                       recv_data(sd, &size, sizeof(int));
                        objcopy = objstrAlloc(record->cache, size);
-                       int sum = 0;
-                       while (sum < size) {
-                               sum += read(sd, (char *)objcopy+sum, size-sum);
-                       }
+                       recv_data(sd, objcopy, size);
+                       
                        /* Insert into cache's lookup table */
                        chashInsert(record->lookupTable, oid, objcopy); 
                        break;
@@ -1599,10 +1598,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
 
        /* Send TRANS_PREFETCH control message */
        control = TRANS_PREFETCH;
-       if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-               perror("sendPrefetchReq() Sending TRANS_PREFETCH");
-               return;
-       }
+       send_data(sd, &control, sizeof(char));
 
        /* Send Oids and offsets in pairs */
        tmp = mcpilenode->objpiles;
@@ -1622,21 +1618,14 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
                        *((short*)(oidnoffset + off)) = tmp->offset[i];
                        off+=sizeof(short);
                }
-               if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) {
-                       perror("Sending oids and offsets");
-                       return;
-               }
-               
+               send_data(sd, oidnoffset, len);
                tmp = tmp->next;
        }
 
        /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
        endpair = -1;
-       if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
-               perror("Error sending endpair\n");
-               return;
-       }
-
+       send_data(sd, &endpair, sizeof(int));
+       
        return;
 }
 
@@ -1646,67 +1635,61 @@ int getPrefetchResponse(int sd) {
        unsigned int oid;
        void *modptr, *oldptr;
 
-       if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) {
-               printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__);
+       recv_data((int)sd, &length, sizeof(int)); 
+       size = length - sizeof(int);
+       if((recvbuffer = calloc(1, size)) == NULL) {
+               printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
                return -1;
-       } else {
+       }
+       recv_data((int)sd, recvbuffer, size);
+
+       control = *((char *) recvbuffer);
+       if(control == OBJECT_FOUND) {
                numbytes = 0;
-               size = length - sizeof(int);
-               if((recvbuffer = calloc(1, size)) == NULL) {
-                       printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+               oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+               size = size - (sizeof(char) + sizeof(unsigned int));
+               pthread_mutex_lock(&prefetchcache_mutex);
+               if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+                       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(&prefetchcache_mutex);
+                       free(recvbuffer);
                        return -1;
                }
-               while(numbytes < size) {
-                       numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0);
-               }
-               
-               control = *((char *) recvbuffer);
-               if(control == OBJECT_FOUND) {
-                       numbytes = 0;
-                       oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-                       size = size - (sizeof(char) + sizeof(unsigned int));
-                       pthread_mutex_lock(&prefetchcache_mutex);
-                       if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
-                               printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
-                               pthread_mutex_unlock(&prefetchcache_mutex);
-                               free(recvbuffer);
-                               return -1;
-                       }
-                       pthread_mutex_unlock(&prefetchcache_mutex);
-                       memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-
-                       /* Insert the oid and its address into the prefetch hash lookup table */
-                       /* Do a version comparison if the oid exists */
-                       if((oldptr = prehashSearch(oid)) != NULL) {
-                               /* If older version then update with new object ptr */
-                               if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
-                                       prehashRemove(oid);
-                                       prehashInsert(oid, modptr);
-                               } else {
-                                       /* TODO modptr should be reference counted */
-                               }
-                       } else {/* Else add the object ptr to hash table*/
+               pthread_mutex_unlock(&prefetchcache_mutex);
+               memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+
+               /* Insert the oid and its address into the prefetch hash lookup table */
+               /* Do a version comparison if the oid exists */
+               if((oldptr = prehashSearch(oid)) != NULL) {
+                       /* If older version then update with new object ptr */
+                       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+                               prehashRemove(oid);
                                prehashInsert(oid, modptr);
+                       } else {
+                               /* TODO modptr should be reference counted */
                        }
-                       /* Lock the Prefetch Cache look up table*/
-                       pthread_mutex_lock(&pflookup.lock);
-                       /* Broadcast signal on prefetch cache condition variable */ 
-                       pthread_cond_broadcast(&pflookup.cond);
-                       /* Unlock the Prefetch Cache look up table*/
-                       pthread_mutex_unlock(&pflookup.lock);
-               } else if(control == OBJECT_NOT_FOUND) {
-                       oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-                       /* TODO: For each object not found query DHT for new location and retrieve the object */
-                       /* Throw an error */
-                       printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
-                       free(recvbuffer);
-                       exit(-1);
-               } else {
-                       printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
+               } else {/* Else add the object ptr to hash table*/
+                       prehashInsert(oid, modptr);
                }
+               /* Lock the Prefetch Cache look up table*/
+               pthread_mutex_lock(&pflookup.lock);
+               /* Broadcast signal on prefetch cache condition variable */ 
+               pthread_cond_broadcast(&pflookup.cond);
+               /* Unlock the Prefetch Cache look up table*/
+               pthread_mutex_unlock(&pflookup.lock);
+       } else if(control == OBJECT_NOT_FOUND) {
+               oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+               /* TODO: For each object not found query DHT for new location and retrieve the object */
+               /* Throw an error */
+               printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
                free(recvbuffer);
+               exit(-1);
+       } else {
+               printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
        }
 
+       free(recvbuffer);
+
        return 0;
 }
 
@@ -1762,22 +1745,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid)
        {
                msg[0] = START_REMOTE_THREAD;
                memcpy(&msg[1], &oid, sizeof(unsigned int));
-
-               bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
-               if (bytesSent < 0)
-               {
-                       perror("startRemoteThread():send()");
-                       status = -1;
-               }
-               else if (bytesSent != 1 + sizeof(unsigned int))
-               {
-                       printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
-                       status = -1;
-               }
-               else
-               {
-                       status = 0;
-               }
+               send_data(sock, msg, 1 + sizeof(unsigned int));
        }
 
        close(sock);
@@ -1989,17 +1957,8 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
                *((unsigned int *)(&msg[1] + size)) = threadid;
 
                pthread_mutex_lock(&(ndata->threadnotify));
-               bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0);
-               if (bytesSent < 0){
-                       perror("reqNotify():send()");
-                       status = -1;
-               } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){
-                       printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__);
-                       status = -1;
-               } else {
-                       status = 0;
-               }
-               
+               size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+               send_data(sock, msg, size);
                pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
                pthread_mutex_unlock(&(ndata->threadnotify));
        }
@@ -2080,17 +2039,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
                        size+= sizeof(unsigned short);
                        *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
 
-                       bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0);
-                       if (bytesSent < 0){
-                               perror("notifyAll():send()");
-                               status = -1;
-                       } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
-                               printf("notifyAll(): error, sent %d bytes %s, %d\n", 
-                                               bytesSent, __FILE__, __LINE__);
-                               status = -1;
-                       } else {
-                               status = 0;
-                       }
+                       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+                       send_data(sock, msg, size);
                }
                //close socket
                close(sock);