my changes:
authorbdemsky <bdemsky>
Thu, 14 Jun 2007 18:09:46 +0000 (18:09 +0000)
committerbdemsky <bdemsky>
Thu, 14 Jun 2007 18:09:46 +0000 (18:09 +0000)
1) Handle sigpipe
2) allow quick restart of the server...deal with the port issue
3) combine some requests
4) remove commented out code to make it readable

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 6ddba6669f7d69c320bcbdee17d33cec98f83b7c..2f841f28ccdf0cc0035cbc0f146f20e017e130d5 100644 (file)
@@ -12,5 +12,11 @@ all:
        gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
        gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
+
+mac:
+       gcc -DMAC -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -DMAC -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -DMAC -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+
 clean:
        rm -rf d-2 d-1 demsky
index eb0f655c56348bbbad43004e3cdf9674f2358a47..de0af029e82499052944a8208f20248d458b0586 100644 (file)
@@ -1,6 +1,10 @@
 #ifndef _DSTM_H_
 #define _DSTM_H_
 
+#ifdef MAC
+#define MSG_NOSIGNAL 0
+#endif
+
 //Coordinator Messages
 #define READ_REQUEST           1
 #define READ_MULT_REQUEST      2
index 5e9aa5997d001e13342d0c16f1388e07877c6b46..34487ff00dda826e000af1c5a0e6fd4e9fac56a0 100644 (file)
@@ -26,9 +26,6 @@ int dstmInit(void)
        if (lhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
        
-       //pthread_t threadListen;
-       //pthread_create(&threadListen, NULL, dstmListen, NULL);
-       
        return 0;
 }
 
@@ -40,6 +37,7 @@ void *dstmListen()
        socklen_t addrlength = sizeof(struct sockaddr);
        pthread_t thread_dstm_accept;
        int i;
+       int setsockflag=1;
 
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        if (listenfd == -1)
@@ -48,6 +46,17 @@ void *dstmListen()
                exit(1);
        }
 
+       if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
+         perror("socket");
+         exit(1);
+       }
+#ifdef MAC
+       if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
+         perror("socket");
+         exit(1);
+       }
+#endif
+
        my_addr.sin_family = AF_INET;
        my_addr.sin_port = htons(LISTEN_PORT);
        my_addr.sin_addr.s_addr = INADDR_ANY;
@@ -103,31 +112,19 @@ void *dstmAccept(void *acceptfd)
                        size = sizeof(objheader_t) + sizeof(classsize[h->type]);
                        if (h == NULL) {
                                ctrl = OBJECT_NOT_FOUND;
-                               if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
+                               if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                        perror("Error sending control msg to coordinator\n");
                                }
                        } else {
-                               //char responsemessage[sizeof(char)+sizeof(int)];
                                /* Type */
-                               ctrl = OBJECT_FOUND;
-                               if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
-                                       perror("Error sending control msg to coordinator\n");
-                               }
-
-                               //responsemessage[0]=OBJECT_FOUND;
-                               /* Size of object */
-                               //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
-                               //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
-                               //      perror("Error sending control msg to coordinator\n");
-                               //}
-
-                               /* Size of object */
-                               if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
-                                       perror("Error sending size of object to coordinator\n");
-                               }
-                               if(send((int)acceptfd, h, size, 0) < 0) {
-                                       perror("Error in sending object\n");
-                               }
+                         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+                         *((int *)&msg[1])=size;
+                         if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
+                           perror("Error sending size of object to coordinator\n");
+                         }
+                         if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
+                           perror("Error in sending object\n");
+                         }
                        }
                        break;
                
@@ -144,7 +141,7 @@ void *dstmAccept(void *acceptfd)
                        break;
 
                case TRANS_REQUEST:
-                       printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd);
+                       printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
                        if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
                                printf("Error in readClientReq\n");
                        }
@@ -176,11 +173,9 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        fixed.control = TRANS_REQUEST;
        do {
                n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
-       //      printf("DEBUG -> 1. Reading %d bytes \n", n);
                sum += n;
        } while(sum < N && n != 0); 
 
-       //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
        // Read list of mids
        int mcount = fixed.mcount;
        N = mcount * sizeof(unsigned int);
@@ -189,7 +184,6 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        sum = 0;
        do {
                n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
-       //      printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
                sum += n;
        } while(sum < N && n != 0);
 
@@ -198,15 +192,11 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        N = numread * (sizeof(unsigned int) + sizeof(short));
        char objread[N];
        if(numread != 0) { // If pile contains objects to be read 
-       //      N = numread * (sizeof(unsigned int) + sizeof(short));
-       //      char objread[N];
                sum = 0;
                do {
                        n = recv((int)acceptfd, (void *) objread, N, 0);
-               //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
                        sum += n;
                } while(sum < N && n != 0);
-//             printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
        }
        
        // Read modified objects
@@ -218,31 +208,27 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                sum = 0;
                do { // Recv the objs that are modified at Coordinator
                        n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
-               //      printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
                        sum += n;
                } while (sum < fixed.sum_bytes && n != 0);
        }
 
        //Send control message as per all votes from all oids in the machine
-       if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) {
-               printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__);
+       if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
+               printf("Handle req error\n");
        }
 
        //Read for new control message from Coordiator
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
-               printf("DEBUG -> Error receiving control, received %d\n", control);
+               perror("Error in receiving control message");
                return 1;
        }
 
-       printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control);
-       fflush(stdout);
-
        switch(control) {
                case TRANS_ABORT:
-                       printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ;
+                       printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
                        //send ack to coordinator
                        sendctrl = TRANS_SUCESSFUL;
-                       if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+                       if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                perror("Error sending ACK to coordinator\n");
                                return 1;
                        }
@@ -267,26 +253,25 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                        }
                        break;
                case TRANS_ABORT_BUT_RETRY_COMMIT:
-                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd);
+                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
                        //Process again after waiting for sometime and on prev control message sent
                        sleep(2);
                        switch(prevctrl) {
                                case TRANS_AGREE:
                                        sendctrl = TRANS_AGREE;
-                                       if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
+                                       if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                                perror("Error sending ACK to coordinator\n");
                                        }
-                                       //sleep(5);
                                        break;
                                case TRANS_SOFT_ABORT:
                                        if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
-                                               printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__);
+                                               printf("Handle req error\n");
                                        }
                                        //If no change in previous control message that was sent then ABORT transaction
                                        if(newctrl == TRANS_SOFT_ABORT){
                                                //Send ABORT
                                                newctrl = TRANS_DISAGREE;
-                                               if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
+                                               if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                                        perror("Error sending ACK to coordinator\n");
                                                }
                                                //Set the reference count of the object to 1 in mainstore for garbage collection
@@ -301,11 +286,10 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                                                        ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
                                                        ((objheader_t *)ptr)->status &= ~(LOCK);                
                                                }
-                                       //      return 0;
                                        } else if(newctrl == TRANS_AGREE) {
                                                newctrl = TRANS_AGREE;
                                                //Send new control message
-                                               if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
+                                               if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                                                        perror("Error sending ACK to coordinator\n");
                                                }
                                        }
@@ -316,14 +300,13 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                        break;
                case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
                        //TODO expect another transrequest from client
-                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd);
+                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
                        break;
                default:
-                       printf("No response to TRANS_AGREE OR DISAGREE control\n");
+                       printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
                        //TODO Use fixed.trans_id  TID since Client may have died
                        break;
        }
-
        //Free memory
        printf("DEBUG -> Freeing...");
        fflush(stdout);
@@ -363,7 +346,8 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        
        //Process each object present in the pile 
        ptr = modptr;
-       
+       //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
+       fflush(stdout);
        //Process each oid in the machine pile/ group
        for (i = 0; i < fixed->numread + fixed->nummod; i++) {
                if (i < fixed->numread) {//Object is read
@@ -394,18 +378,20 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                        v_nomatch++;
                                        //send TRANS_DISAGREE to Coordinator
                                        control = TRANS_DISAGREE;
-                                       if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+                                       if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd);
+                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                        return control;
                                }
                        } else {//Obj is not locked , so lock object
                                ((objheader_t *)mobj)->status |= LOCK;
+                               //FOR TESTING
+                               sleep(1);
                                //Save all object oids that are locked on this machine during this transaction request call
                                oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
-                               printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid);
+                               printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
                                objlocked++;
                                if (version == ((objheader_t *)mobj)->version) { //If versions match
                                        v_matchnolock++;
@@ -413,11 +399,11 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                        v_nomatch++;
                                        //send TRANS_DISAGREE to Coordinator
                                        control = TRANS_DISAGREE;
-                                       if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+                                       if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
+                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                        return control;
                                }
                        }
@@ -436,28 +422,28 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        if(v_matchnolock == fixed->numread + fixed->nummod) {
                //send TRANS_AGREE to Coordinator
                control = TRANS_AGREE;
-               if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+               if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                        perror("Error in sending control to Coordinator\n");
                        return 0;
                }
-               printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd);
+               printf("DEBUG -> Sending TRANS_AGREE\n");
        }
        //Condition to send TRANS_SOFT_ABORT
        if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
                //send TRANS_SOFT_ABORT to Coordinator
                control = TRANS_SOFT_ABORT;
-               if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
-                       perror("Error in sending control back to coordinator\n");
-                       return 0;
-               }
-               printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd);
+               char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
+               *((int*)&msg[1])=objnotfound;
+
+               printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
                //send number of oids not found and the missing oids 
-               if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
+               if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
                        perror("Error in sending no of objects that are not found\n");
                        return 0;
                }
                if(objnotfound != 0) { 
-                       if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
+                 int size=sizeof(unsigned int)*objnotfound;
+                 if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
                                perror("Error in sending objects that are not found\n");
                                return 0;
                        }
@@ -500,12 +486,12 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        //Process each modified object saved in the mainobject store
        for(i=0; i<transinfo->nummod; i++) {
                if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
-                       printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__);
+                       printf("mhashserach returns NULL\n");
                }
                //change reference count of older address and free space in objstr ??
                header->rcount = 1; //Not sure what would be th val
                //change ptr address in mhash table
-               printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]);
+               printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
                mhashRemove(transinfo->objmod[i]);
                mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
                offset += sizeof(objheader_t) + classsize[header->type];
@@ -524,8 +510,8 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        //send ack to coordinator
        control = TRANS_SUCESSFUL;
        //FOR TESTING
-       printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd);
-       if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
+       printf("DEBUG-> Transaction is SUCCESSFUL \n");
+       if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                perror("Error sending ACK to coordinator\n");
        }
        
index 2c47e2ac932190b66fe62205f0a39e3a7ddbade8..0af91b9dae288dc1727ebed7533abc1856fcc982 100644 (file)
@@ -26,7 +26,7 @@ transrecord_t *transStart()
 
 objheader_t *transRead(transrecord_t *record, unsigned int oid)
 {      
-//     printf("Enter TRANS_READ\n");
+       printf("Enter TRANS_READ\n");
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
@@ -34,7 +34,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        void *buf;
                //check cache
        if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
-               printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__);
+               printf("DEBUG -> transRead oid %d found local\n", oid);
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
@@ -55,7 +55,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
-                       printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__);
+                       printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
                        return NULL;
                }
                else {
@@ -85,14 +85,14 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
        unsigned int *oidnotfound;
        objheader_t *header;
 
-//     printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
+       printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
        //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
                //Switch case
                control = tdata->recvmsg[i].rcv_status;
                switch(control) {
                        case TRANS_DISAGREE:
-//                             printf("DEBUG-> Inside TRANS_DISAGREE\n");
+                               printf("DEBUG-> Inside TRANS_DISAGREE\n");
                                transdisagree++;
                                //Free transaction records
                                objstrDelete(tdata->rec->cache);
@@ -101,7 +101,7 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                                //send Abort
                                ctrl = TRANS_ABORT;
                                for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
-                                       if (write(sd, &ctrl, sizeof(char)) < 0) {
+                                       if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
                                                perror("Error sending ctrl message for participant\n");
                                                return 1;
                                        }
@@ -109,13 +109,13 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                                return 0;
 
                        case TRANS_AGREE:
-                               printf("Inside TRANS_AGREE\n");
+                               printf("DEBUG-> Inside TRANS_AGREE\n");
                                PRINT_TID(tdata);
                                transagree++;
                                break;
                                
                        case TRANS_SOFT_ABORT:
-                               printf("Inside TRANS_SOFT_ABORT\n");
+                               printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
                                transsoftabort++;
                                /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
                                if ((i == tdata->thread_id) && (val == 0)) {
@@ -137,6 +137,7 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                                                } while(sum < N && n !=0);
                                        }
                                }
+
                                break;
                        default:
                                printf("Participant sent unknown message\n");
@@ -147,8 +148,8 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
        if(transagree == tdata->pilecount){
                //Send Commit
                ctrl = TRANS_COMMIT;
-               printf("Sending TRANS_COMMIT accept_fd = %d\n", sd);
-               if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
+               printf("Sending TRANS_COMMIT\n");
+               if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
@@ -158,11 +159,12 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
        if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
                //Send abort but retry commit
                ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
-               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd);
-               if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
+               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
+               if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
+       /*      
                //Sleep and the resend the request
                sleep(2);
                //Read new control message from Participant
@@ -176,13 +178,14 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                tdata->recvmsg[tdata->thread_id].rcv_status = control;
                val = 1;
                decideResponse(tdata, sd, val);         //Second call to decideResponse(); indicated by parameter val = 1
+       */      
        }
 
        if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
                //Send abort but retry commit after relooking up objects
                ctrl = TRANS_ABORT;
                printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
-               if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
+               if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
@@ -231,30 +234,38 @@ void *transRequest(void *threadarg) {
        printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
 //     printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
 //     printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
-       if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
+       if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
                perror("Error sending fixed bytes for thread");
                return NULL;
        }
        //Send list of machines involved in the transaction
 //     printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
-       if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
-               perror("Error sending list of machines for thread");
-               return NULL;
+       {
+         int size=sizeof(unsigned int)*tdata->pilecount;
+         if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
+           perror("Error sending list of machines for thread");
+           return NULL;
+         }
        }
        //Send oids and version number tuples for objects that are read
 //     printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
 //     printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
-       if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
-               perror("Error sending tuples for thread");
-               return NULL;
+       {
+         int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
+         if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
+           perror("Error sending tuples for thread");
+           return NULL;
+         }
        }
        //Send objects that are modified
        for(i = 0; i < tdata->buffer->f.nummod ; i++) {
-               headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-               if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
-                       perror("Error sending obj modified for thread");
-                       return NULL;
-               }
+         int size;
+         headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+         size=sizeof(objheader_t)+classsize[headeraddr->type];
+         if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
+           perror("Error sending obj modified for thread");
+           return NULL;
+         }
        }
        
        //Read message  control message from participant side
@@ -457,7 +468,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        char readrequest[sizeof(char)+sizeof(unsigned int)];
        readrequest[0] = READ_REQUEST;
        *((unsigned int *)(&readrequest[1])) = oid;
-       if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
+       if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
                perror("Error sending message\n");
                return NULL;
        }