Various bug fixes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index d4d9bb17b08d8ecc16a7355087e8120f979eef56..369b34537d2b38e1f3ec97e6fc2faa508e54d17e 100644 (file)
@@ -38,7 +38,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
-               printf("oid not found in local cache\n");
+               printf("oid is found in Local mlookup\n");
                tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                //Copy into cache
@@ -49,11 +49,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objcopy);
        } else {
                //Get the object from the remote location
-               //printf("oid not found in local machine lookup\n");
-               printf("machinenumber = %d\n",machinenumber);
-               printf("oid = %d\n",oid);
+               printf("oid is found in remote machine\n");
                machinenumber = lhashSearch(oid);
-               printf("machinenumber = %d\n",machinenumber);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
@@ -79,11 +76,12 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
 }
 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
-       int i, n, N, sum, oidcount = 0;
+       int i, n, N, sum, retval, oidcount = 0;
        int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
        char ctrl, control, *ptr;
        unsigned int *oidnotfound;
        objheader_t *header;
+       
 
        //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
@@ -99,9 +97,11 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                                free(tdata->rec);
                                //send Abort
                                ctrl = TRANS_ABORT;
-                               if (write(sd, &ctrl, sizeof(char)) < 0) {
-                                       perror("Error sending ctrl message for participant\n");
-                                       return 1;
+                               for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
+                                       if (write(sd, &ctrl, sizeof(char)) < 0) {
+                                               perror("Error sending ctrl message for participant\n");
+                                               return 1;
+                                       }
                                }
                                return 0;
 
@@ -145,24 +145,29 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
                //Send Commit
                ctrl = TRANS_COMMIT;
                printf("Sending TRANS_COMMIT\n");
-               if (write(sd, &ctrl, sizeof(char)) < 0) {
+               if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
+               //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
        }
 
        if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
                //Send abort but retry commit
                ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
                printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
-               if (write(sd, &ctrl, sizeof(char)) < 0) {
+               if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
-               //Sleep
+               //Sleep and the resend the request
                sleep(5);
                //Read new control message from Participant
-               n = read(sd, &control, sizeof(char));
+
+               if((n = read(sd, &control, sizeof(char))) <= 0) {
+                       perror("No bytes are read for participant\n");
+                       return 1;
+               }
                
                //Update common data structure and increment count
                tdata->recvmsg[tdata->thread_id].rcv_status = control;
@@ -171,17 +176,17 @@ int decideResponse(thread_data_array_t *tdata, int sd, int val) {
        }
 
        if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
-               //Send abort but retry commit after relloking up objects
-               //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+               //Send abort but retry commit after relooking up objects
                ctrl = TRANS_ABORT;
                printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
-               if (write(sd, &ctrl, sizeof(char)) < 0) {
+               if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
                        perror("Error sending ctrl message for participant\n");
                        return 1;
                }
                //TODO
                //Relook up objects
                //update location table
+               
                //Free pointers
                free(oidnotfound);
        }
@@ -200,7 +205,6 @@ void *transRequest(void *threadarg) {
        char machineip[16];
 
        tdata = (thread_data_array_t *) threadarg;
-       printf("DEBUG -> New thread id %d\n", tdata->thread_id);
        //Send Trans Request
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                perror("Error in socket for TRANS_REQUEST");
@@ -213,7 +217,6 @@ void *transRequest(void *threadarg) {
        midtoIP(tdata->mid,machineip);
        machineip[15] = '\0';
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
-       //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
 
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                perror("Error in connect for TRANS_REQUEST");
@@ -237,7 +240,7 @@ void *transRequest(void *threadarg) {
        }
        //Send oids and version number tuples for objects that are read
 //     printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-//     printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
+//     printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
        if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
                perror("Error sending tuples for thread");
                return NULL;
@@ -245,7 +248,6 @@ void *transRequest(void *threadarg) {
        //Send objects that are modified
        for(i = 0; i < tdata->buffer->f.nummod ; i++) {
                headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-//             printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
                if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
                        perror("Error sending obj modified for thread");
                        return NULL;
@@ -253,9 +255,11 @@ void *transRequest(void *threadarg) {
        }
        
        //Read message  control message from participant side
-       n = read(sd, &control, sizeof(char));
+       if((n = read(sd, &control, sizeof(char))) <= 0) {
+               perror("Error in reading control message from Participant\n");
+               return NULL;
+       }
        recvcontrol = control;
-       printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
        
        //Update common data structure and increment count
        tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
@@ -266,15 +270,16 @@ void *transRequest(void *threadarg) {
        
        if(*(tdata->count) == tdata->pilecount) {
                pthread_cond_broadcast(tdata->threshold);
-               //process the participant's request
-               if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
-                       printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(tdata->lock);
-                       return NULL;
-               }
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
        }       
+
+       //process the participant's request
+       if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
+               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+               pthread_mutex_unlock(tdata->lock);
+               return NULL;
+       }
        pthread_mutex_unlock(tdata->lock);
 
        close(sd);
@@ -292,6 +297,7 @@ int transCommit(transrecord_t *record) {
        char buffer[RECEIVE_BUFFER_SIZE],control;
        char transid[TID_LEN];
        static int newtid = 0;
+       trans_req_data_t *tosend;
 
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
@@ -337,7 +343,10 @@ int transCommit(transrecord_t *record) {
        pthread_cond_t tcond;
        pthread_mutex_t tlock;
        pthread_mutex_t tlshrd;
-       thread_data_array_t thread_data_array[pilecount];
+       //thread_data_array_t thread_data_array[pilecount];
+       thread_data_array_t *thread_data_array;
+
+       thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
        
        thread_response_t rcvd_control_msg[pilecount];  //Shared thread array that keeps track of responses of participants
                
@@ -356,10 +365,9 @@ int transCommit(transrecord_t *record) {
        pListMid(pile, listmid);
        //Process each machine group
        while(tmp != NULL) {
-               printf("DEBUG -> Created thread %d... \n", numthreads);
                //Create transaction id
                newtid++;
-               trans_req_data_t *tosend;
+               //trans_req_data_t *tosend;
                if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                        return 1;
@@ -388,9 +396,10 @@ int transCommit(transrecord_t *record) {
                        perror("Error in pthread create");
                        return 1;
                }               
+
                numthreads++;           
                //TODO frees 
-               free(tosend);
+               //free(tosend);
                tmp = tmp->next;
        }
 
@@ -408,6 +417,9 @@ int transCommit(transrecord_t *record) {
        //Free resources        
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
+//     for(i = 0 ;i< pilecount ;i++) {
+               free(tosend);
+//     }
        free(listmid);
        pDelete(pile);
        return 0;
@@ -415,7 +427,7 @@ int transCommit(transrecord_t *record) {
 
 //mnun will be used to represent the machine IP address later
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
-       int sd, size;
+       int sd, size, val;
        struct sockaddr_in serv_addr;
        struct hostent *server;
        char control;
@@ -424,7 +436,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        void *objcopy;
 
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket");
+               perror("Error in socket\n");
                return NULL;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
@@ -436,14 +448,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
 
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect");
+               perror("Error in connect\n");
                return NULL;
        }
        char readrequest[sizeof(char)+sizeof(unsigned int)];
        readrequest[0] = READ_REQUEST;
        *((unsigned int *)(&readrequest[1])) = oid;
        if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
-               perror("Error sending message");
+               perror("Error sending message\n");
                return NULL;
        }
 
@@ -451,15 +463,24 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        printf("DEBUG -> ready to rcv ...\n");
 #endif
        //Read response from the Participant
-       read(sd, &control, sizeof(char));
+       if((val = read(sd, &control, sizeof(char))) <= 0) {
+               perror("No control response for getRemoteObj sent\n");
+               return NULL;
+       }
        switch(control) {
                case OBJECT_NOT_FOUND:
                        return NULL;
                        break;
                case OBJECT_FOUND:
-                       read(sd, &size, sizeof(int));
+                       if((val = read(sd, &size, sizeof(int))) <= 0) {
+                               perror("No size is read from the participant\n");
+                               return NULL;
+                       }
                        objcopy = objstrAlloc(record->cache, size);
-                       read(sd, objcopy, size);                
+                       if((val = read(sd, objcopy, size)) <= 0) {
+                               perror("No objects are read from the remote participant\n");
+                               return NULL;
+                       }
                        //Insert into cache's lookup table
                        chashInsert(record->lookupTable, oid, objcopy); 
                        break;
@@ -467,5 +488,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                        printf("Error in recv request from participant on a READ_REQUEST\n");
                        return NULL;
        }
+       close(sd);
        return objcopy;
 }