Fixed bugs
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 0af91b9dae288dc1727ebed7533abc1856fcc982..e9daecf759fbdb258f56d6c4c25ff1823aa8edbf 100644 (file)
@@ -9,6 +9,9 @@
 #include<sys/socket.h>
 #include<netdb.h>
 #include<netinet/in.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <time.h>
 
 #define LISTEN_PORT 2156
 #define MACHINE_IP "127.0.0.1"
 
 extern int classsize[];
 
+void randomdelay(void)
+{
+       struct timespec req, rem;
+       time_t t;
+
+       t = time(NULL);
+       req.tv_sec = 0;
+       req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+       nanosleep(&req, &rem);
+       return;
+}
+
 transrecord_t *transStart()
 {
        transrecord_t *tmp = malloc(sizeof(transrecord_t));
@@ -26,7 +41,6 @@ transrecord_t *transStart()
 
 objheader_t *transRead(transrecord_t *record, unsigned int oid)
 {      
-       printf("Enter TRANS_READ\n");
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
@@ -34,7 +48,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        void *buf;
                //check cache
        if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
-               printf("DEBUG -> transRead oid %d found local\n", oid);
+               //printf("DEBUG -> transRead oid %d found local\n", oid);
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
@@ -55,11 +69,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
-                       printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
+                       //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
                        return NULL;
                }
                else {
-                       printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
+                       //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
                        return(objcopy);
                }
        } 
@@ -77,127 +91,105 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        chashInsert(record->lookupTable, tmp->oid, tmp);
        return tmp;
 }
-//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
-int decideResponse(thread_data_array_t *tdata, int sd, int val) {
-       int i, n, N, sum, retval, oidcount = 0;
-       int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
-       char ctrl, control, *ptr;
-       unsigned int *oidnotfound;
-       objheader_t *header;
 
-       printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
+//This function decides the reponse that needs to be sent to all other machines involved in a 
+//transaction by the machine that initiated the transaction request
+
+int decideResponse(thread_data_array_t *tdata) {
+       char control;
+       int i, transagree = 0, transdisagree = 0, transsoftabort = 0;
+
        //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
                //Switch case
                control = tdata->recvmsg[i].rcv_status;
                switch(control) {
                        case TRANS_DISAGREE:
-                               printf("DEBUG-> Inside TRANS_DISAGREE\n");
+                               printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
                                transdisagree++;
-                               //Free transaction records
-                               objstrDelete(tdata->rec->cache);
-                               chashDelete(tdata->rec->lookupTable);
-                               free(tdata->rec);
-                               //send Abort
-                               ctrl = TRANS_ABORT;
-                               for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
-                                       if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
-                                               perror("Error sending ctrl message for participant\n");
-                                               return 1;
-                                       }
-                               }
-                               return 0;
+                               break;
 
                        case TRANS_AGREE:
-                               printf("DEBUG-> Inside TRANS_AGREE\n");
-                               PRINT_TID(tdata);
+                               printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
                                transagree++;
                                break;
                                
                        case TRANS_SOFT_ABORT:
-                               printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+                               printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
                                transsoftabort++;
-                               /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
-                               if ((i == tdata->thread_id) && (val == 0)) {
-                                       //Read list of objects missing
-                                       if(read(sd, &oidcount, sizeof(int)) != 0) {
-                                               //Break if only objs are locked at the Participant side
-                                               if (oidcount == 0) {
-                                                       break;
-                                               }
-                                               transsoftabortmiss++;
-                                               N = oidcount * sizeof(unsigned int);
-                                               if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
-                                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                                               }
-                                               ptr = (char *) oidnotfound;
-                                               do {
-                                                       n = read(sd, ptr+sum, N-sum);
-                                                       sum += n;
-                                               } while(sum < N && n !=0);
-                                       }
-                               }
-
                                break;
                        default:
-                               printf("Participant sent unknown message\n");
+                               printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+                               return -1;
                }
        }
        
        //Decide what control message to send to Participant    
-       if(transagree == tdata->pilecount){
+       if(transdisagree > 0) {
+               //Send Abort
+               *(tdata->replyctrl) = TRANS_ABORT;
+               printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+               objstrDelete(tdata->rec->cache);
+               chashDelete(tdata->rec->lookupTable);
+               free(tdata->rec);
+       } else if(transagree == tdata->pilecount){
                //Send Commit
-               ctrl = TRANS_COMMIT;
-               printf("Sending TRANS_COMMIT\n");
-               if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error sending ctrl message for participant\n");
-                       return 1;
-               }
-               //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
+               *(tdata->replyctrl) = TRANS_COMMIT;
+               printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
+               objstrDelete(tdata->rec->cache);
+               chashDelete(tdata->rec->lookupTable);
+               free(tdata->rec);
+       } else if(transsoftabort > 0 && transdisagree == 0) {
+               //Send Abort
+               *(tdata->replyctrl) = TRANS_ABORT;
+               *(tdata->replyretry) = 1;
+               //objstrDelete(tdata->rec->cache);
+               //chashDelete(tdata->rec->lookupTable);
+               //free(tdata->rec);
+               printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+       } else {
+               printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
+               return -1;
        }
+       
+       return 0;
+}
+//This function sends the final response to all threads in their respective socket id 
+char sendResponse(thread_data_array_t *tdata, int sd) {
+       int n, N, sum, oidcount = 0;
+       char *ptr, retval = 0;
+       unsigned int *oidnotfound;
 
-       if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
-               //Send abort but retry commit
-               ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
-               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
-               if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error sending ctrl message for participant\n");
-                       return 1;
-               }
-       /*      
-               //Sleep and the resend the request
-               sleep(2);
-               //Read new control message from Participant
-
-               if((n = read(sd, &control, sizeof(char))) <= 0) {
-                       perror("No bytes are read for participant\n");
-                       return 1;
+       //If the decided response is due to a soft abort and missing objects at the Participant's side
+       if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
+               //Read list of objects missing
+               if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+                       //Break if only objs are locked at the Participant side
+                       N = oidcount * sizeof(unsigned int);
+                       if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       }
+                       ptr = (char *) oidnotfound;
+                       do {
+                               n = read(sd, ptr+sum, N-sum);
+                               sum += n;
+                       } while(sum < N && n !=0);
                }
-               
-               //Update common data structure and increment count
-               tdata->recvmsg[tdata->thread_id].rcv_status = control;
-               val = 1;
-               decideResponse(tdata, sd, val);         //Second call to decideResponse(); indicated by parameter val = 1
-       */      
+               retval =  TRANS_SOFT_ABORT;
        }
-
-       if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
-               //Send abort but retry commit after relooking up objects
-               ctrl = TRANS_ABORT;
-               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
-               if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error sending ctrl message for participant\n");
-                       return 1;
-               }
-               //TODO
-               //Relook up objects
-               //update location table
-               
-               //Free pointers
-               free(oidnotfound);
+       //If the decided response is TRANS_ABORT
+       if(*(tdata->replyctrl) == TRANS_ABORT) {
+               retval = TRANS_ABORT;
        }
-       
-       return 0;
+       if(*(tdata->replyctrl) == TRANS_COMMIT) {
+               retval = TRANS_COMMIT;
+       }
+       // Send response to the Participant
+       if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
+               perror("Error sending ctrl message for participant\n");
+       }
+
+       return retval;
 }
 
 void *transRequest(void *threadarg) {
@@ -206,54 +198,46 @@ void *transRequest(void *threadarg) {
        struct hostent *server;
        thread_data_array_t *tdata;
        objheader_t *headeraddr;
-       //unsigned int *oidnotfound;
        char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
-       char machineip[16];
+       char machineip[16], retval;
 
        tdata = (thread_data_array_t *) threadarg;
        //Send Trans Request
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket for TRANS_REQUEST");
+               perror("Error in socket for TRANS_REQUEST\n");
                return NULL;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
-       //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
        midtoIP(tdata->mid,machineip);
        machineip[15] = '\0';
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
 
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect for TRANS_REQUEST");
+               perror("Error in connect for TRANS_REQUEST\n");
                return NULL;
        }
-
-       //Multiple writes for sending packets of data 
-       //Send first few fixed bytes of the TRANS_REQUEST protocol
-       printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
-//     printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
-//     printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
+       
+       printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
+       //Send bytes of data with TRANS_REQUEST control message
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
-               perror("Error sending fixed bytes for thread");
+               perror("Error sending fixed bytes for thread\n");
                return NULL;
        }
        //Send list of machines involved in the transaction
-//     printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
        {
          int size=sizeof(unsigned int)*tdata->pilecount;
          if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
-           perror("Error sending list of machines for thread");
+           perror("Error sending list of machines for thread\n");
            return NULL;
          }
        }
        //Send oids and version number tuples for objects that are read
-//     printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-//     printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
        {
          int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
          if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
-           perror("Error sending tuples for thread");
+           perror("Error sending tuples for thread\n");
            return NULL;
          }
        }
@@ -263,11 +247,11 @@ void *transRequest(void *threadarg) {
          headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
          size=sizeof(objheader_t)+classsize[headeraddr->type];
          if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
-           perror("Error sending obj modified for thread");
+           perror("Error sending obj modified for thread\n");
            return NULL;
          }
        }
-       
+
        //Read message  control message from participant side
        if((n = read(sd, &control, sizeof(char))) <= 0) {
                perror("Error in reading control message from Participant\n");
@@ -280,22 +264,29 @@ void *transRequest(void *threadarg) {
        //Lock and update count
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(tdata->lock);
-               (*(tdata->count))++;
+
+       (*(tdata->count))++;
        
        if(*(tdata->count) == tdata->pilecount) {
+               if (decideResponse(tdata) != 0) { 
+                       printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(tdata->lock);
+                       close(sd);
+                       return NULL;
+               }
                pthread_cond_broadcast(tdata->threshold);
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
        }       
 
-       //process the participant's request
-       if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
-               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+       pthread_mutex_unlock(tdata->lock);
+       
+       if (sendResponse(tdata, sd) == 0) { 
+               printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
                pthread_mutex_unlock(tdata->lock);
+               close(sd);
                return NULL;
        }
-       pthread_mutex_unlock(tdata->lock);
-
        close(sd);
        pthread_exit(NULL);
 }
@@ -312,6 +303,7 @@ int transCommit(transrecord_t *record) {
        char transid[TID_LEN];
        static int newtid = 0;
        trans_req_data_t *tosend;
+       char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent
 
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
@@ -328,14 +320,12 @@ int transCommit(transrecord_t *record) {
                        //Get machine location for object id
                        
                        if ((machinenum = lhashSearch(curr->key)) == 0) {
-                              printf("Error: No such machine\n");
+                              printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
                               return 1;
                        }
                                        
-                       //TODO only for debug
-                       //machinenum = 1;
                        if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
-                               printf("Error: No such oid\n");
+                               printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
                                return 1;
                        }
                        //Make machine groups
@@ -378,6 +368,7 @@ int transCommit(transrecord_t *record) {
                                
        pListMid(pile, listmid);
        //Process each machine group
+       //Should be a new function for while loop
        while(tmp != NULL) {
                //Create transaction id
                newtid++;
@@ -403,6 +394,8 @@ int transCommit(transrecord_t *record) {
                thread_data_array[numthreads].threshold = &tcond;
                thread_data_array[numthreads].lock = &tlock;
                thread_data_array[numthreads].count = &trecvcount;
+               thread_data_array[numthreads].replyctrl = &treplyctrl;
+               thread_data_array[numthreads].replyretry = &treplyretry;
                thread_data_array[numthreads].rec = record;
 
                rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
@@ -410,10 +403,8 @@ int transCommit(transrecord_t *record) {
                        perror("Error in pthread create");
                        return 1;
                }               
-
                numthreads++;           
                //TODO frees 
-               //free(tosend);
                tmp = tmp->next;
        }
 
@@ -431,11 +422,18 @@ int transCommit(transrecord_t *record) {
        //Free resources        
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
-//     for(i = 0 ;i< pilecount ;i++) {
-               free(tosend);
-//     }
+
+       free(tosend);
        free(listmid);
        pDelete(pile);
+       if(treplyretry == 1) {
+               //wait a random amount of time
+               randomdelay();
+               //sleep(1);
+               //Retry the commiting transaction again
+               transCommit(record);
+       }       
+       
        return 0;
 }
 
@@ -499,7 +497,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                        chashInsert(record->lookupTable, oid, objcopy); 
                        break;
                default:
-                       printf("Error in recv request from participant on a READ_REQUEST\n");
+                       printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
                        return NULL;
        }
        close(sd);