Fixed bugs
authoradash <adash>
Wed, 20 Jun 2007 13:02:45 +0000 (13:02 +0000)
committeradash <adash>
Wed, 20 Jun 2007 13:02:45 +0000 (13:02 +0000)
a)changed code to retry commit after random delay (soft abort case)
b)fixed sending n copies of TRANS_ABORT to all threads( was only sent once before)
c)separate multithreaded from single thread functionality(was mixed before)
d)tested for all testcases

Working version for TRANS_COMMIT and TRANS_ABORT

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/ip.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 2f841f28ccdf0cc0035cbc0f146f20e017e130d5..f01dcef96860aab1606d8986faa42c4e76db0e43 100644 (file)
@@ -1,22 +1,22 @@
-d-2:
-       gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+d-3:
+       gcc -dr -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
 demsky:
-       gcc -DDEBUG -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -DDEBUG -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
 
-d-1:
-       gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+d-4:
+       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
 all:
-       gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
-       gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
-       gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
 
 mac:
-       gcc -DMAC -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
-       gcc -DMAC -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
-       gcc -DMAC -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -DMAC -lpthread -g -o d-3 trans.c testd-3.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -DMAC -lpthread -g -o demsky dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -DMAC -lpthread -g -o d-4 trans.c testd-4.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
 clean:
-       rm -rf d-2 d-1 demsky
+       rm -rf d-3 d-4 demsky
index de0af029e82499052944a8208f20248d458b0586..d46d47fbe738e3dd542e377f90bb400e16acd510 100644 (file)
@@ -13,7 +13,6 @@
 #define        TRANS_REQUEST           5
 #define        TRANS_ABORT             6
 #define TRANS_COMMIT           7
-#define TRANS_ABORT_BUT_RETRY_COMMIT   8
 #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING   9
 
 //Participant Messages
@@ -31,9 +30,6 @@
 #define OBJ_LOCKED_BUT_VERSION_MATCH   14
 #define OBJ_UNLOCK_BUT_VERSION_MATCH   15
 #define VERSION_NO_MATCH               16
-//TODO REMOVE THIS
-#define NO_MISSING_OIDS                        22
-#define MISSING_OIDS_PRESENT           23
 
 
 #include <stdlib.h>
@@ -67,13 +63,6 @@ typedef struct transrecord {
        objstr_t *cache;
        chashtable_t *lookupTable;
 } transrecord_t;
-/*
-typedef struct pile {
-       unsigned int mid;
-       unsigned int oid;
-       struct pile *next;
-}pile_t;
-*/
 // Structure that keeps track of responses from the participants
 typedef struct thread_response {
        char rcv_status;
@@ -108,9 +97,14 @@ typedef struct thread_data_array {
        pthread_cond_t *threshold; //threshhold for waking up a thread
        pthread_mutex_t *lock;    //lock the count variable
        int *count;             //variable to count responses of TRANS_REQUEST protocol from all participants
+       char *replyctrl;        //shared ctrl message that stores the reply to be sent, filled by decideResp
+       char *replyretry;       //shared variable to find out if we need retry (TRANS_COMMIT case) 
        transrecord_t *rec;     // To send modified objects
 }thread_data_array_t;
 
+
+
+
 // Structure to save information about an oid necesaary for the decideControl()
 typedef struct objinfo {
        unsigned int oid;
@@ -152,7 +146,8 @@ char handleTransReq(int, fixed_data_t *, trans_commit_data_t *, unsigned int *,
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *record, unsigned int oid);
 objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
-int decideResponse(thread_data_array_t *tdata, int sd, int status);// Coordinator decides what response to send to the participant
+int decideResponse(thread_data_array_t *tdata);// Coordinator decides what response to send to the participant
+char sendResponse(thread_data_array_t *tdata, int sd); //Sends control message back to Participants
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
 int transCommit(transrecord_t *record); //return 0 if successful
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
index 34487ff00dda826e000af1c5a0e6fd4e9fac56a0..9590a1c93c98673bd92948f2f9975746bf8b1735 100644 (file)
@@ -97,6 +97,9 @@ void *dstmAccept(void *acceptfd)
 
        printf("Recieved connection: fd = %d\n", (int)acceptfd);
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
+               if (retval == 0) {
+                       return; // Testing connection
+               }
                perror("Error in receiving control from coordinator\n");
                return;
        }
@@ -106,7 +109,6 @@ void *dstmAccept(void *acceptfd)
                                perror("Error receiving object from cooridnator\n");
                                return;
                        }
-                       printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
                        srcObj = mhashSearch(oid);
                        h = (objheader_t *) srcObj;
                        size = sizeof(objheader_t) + sizeof(classsize[h->type]);
@@ -141,7 +143,7 @@ void *dstmAccept(void *acceptfd)
                        break;
 
                case TRANS_REQUEST:
-                       printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
+                       printf("DEBUG -> Recv TRANS_REQUEST\n");
                        if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
                                printf("Error in readClientReq\n");
                        }
@@ -156,9 +158,9 @@ void *dstmAccept(void *acceptfd)
                printf("Closed connection: fd = %d\n", (int)acceptfd);
        
        pthread_exit(NULL);
-       printf("DEBUG -> Exiting dstmAccept\n");
 }
 
+// Reads transaction request per thread
 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        char *ptr, control, prevctrl, sendctrl, newctrl;
        void *modptr, *header;
@@ -191,7 +193,8 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        int numread = fixed.numread;
        N = numread * (sizeof(unsigned int) + sizeof(short));
        char objread[N];
-       if(numread != 0) { // If pile contains objects to be read 
+       if(numread != 0) { //If pile contains more than one object to be read, 
+                         // keep reading all objects
                sum = 0;
                do {
                        n = recv((int)acceptfd, (void *) objread, N, 0);
@@ -200,9 +203,10 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        }
        
        // Read modified objects
-       if(fixed.nummod != 0) { // If pile contains modified objects 
+       if(fixed.nummod != 0) { // If pile contains more than one modified object,
+                               // allocate new object store and recv all modified objects
                if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
-                       printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+                       printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
                sum = 0;
@@ -212,26 +216,20 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                } while (sum < fixed.sum_bytes && n != 0);
        }
 
+       // Process the information available in the TRANS_REQUEST control
        //Send control message as per all votes from all oids in the machine
        if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
-               printf("Handle req error\n");
+               printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
+               return 1;
        }
-
        //Read for new control message from Coordiator
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
-               perror("Error in receiving control message");
+               perror("Error in receiving control message\n");
                return 1;
        }
 
        switch(control) {
                case TRANS_ABORT:
-                       printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
-                       //send ack to coordinator
-                       sendctrl = TRANS_SUCESSFUL;
-                       if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-                               perror("Error sending ACK to coordinator\n");
-                               return 1;
-                       }
                        //Mark all ref counts as 1 and do garbage collection
                        ptr = modptr;
                        for(i = 0; i< fixed.nummod; i++) {
@@ -241,66 +239,31 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                        }
                        //Unlock objects that was locked in this machine due to this transaction
                        for(i = 0; i< transinfo->numlocked; i++) {
+                               printf("DEBUG-> Unlocking objects\n");
                                header = mhashSearch(transinfo->objlocked[i]);// find the header address
                                ((objheader_t *)header)->status &= ~(LOCK);             
                        }
+               
+                       //send ack to coordinator
+                       printf("DEBUG -> Recv TRANS_ABORT\n");
+                       sendctrl = TRANS_SUCESSFUL;
+                       if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
+                               perror("Error sending ACK to coordinator\n");
+                               return 1;
+                       }
+               
                        ptr = NULL;
                        return 0;
                case TRANS_COMMIT:
-                       printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
+                       printf("DEBUG -> Recv TRANS_COMMIT \n");
                        if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
                                printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
                        }
                        break;
-               case TRANS_ABORT_BUT_RETRY_COMMIT:
-                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
-                       //Process again after waiting for sometime and on prev control message sent
-                       sleep(2);
-                       switch(prevctrl) {
-                               case TRANS_AGREE:
-                                       sendctrl = TRANS_AGREE;
-                                       if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-                                               perror("Error sending ACK to coordinator\n");
-                                       }
-                                       break;
-                               case TRANS_SOFT_ABORT:
-                                       if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
-                                               printf("Handle req error\n");
-                                       }
-                                       //If no change in previous control message that was sent then ABORT transaction
-                                       if(newctrl == TRANS_SOFT_ABORT){
-                                               //Send ABORT
-                                               newctrl = TRANS_DISAGREE;
-                                               if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-                                                       perror("Error sending ACK to coordinator\n");
-                                               }
-                                               //Set the reference count of the object to 1 in mainstore for garbage collection
-                                               ptr = modptr;
-                                               for(i = 0; i< fixed.nummod; i++) {
-                                                       tmp_header = (objheader_t *) ptr;
-                                                       tmp_header->rcount = 1;
-                                                       ptr += sizeof(objheader_t) + classsize[tmp_header->type];
-                                               }
-                                               //Unlock objects that was locked in this machine due to this transaction
-                                               for(i = 0; i< transinfo->numlocked; i++) {
-                                                       ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
-                                                       ((objheader_t *)ptr)->status &= ~(LOCK);                
-                                               }
-                                       } else if(newctrl == TRANS_AGREE) {
-                                               newctrl = TRANS_AGREE;
-                                               //Send new control message
-                                               if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-                                                       perror("Error sending ACK to coordinator\n");
-                                               }
-                                       }
-
-                                       break;
-                       }
 
-                       break;
                case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
                        //TODO expect another transrequest from client
-                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
+                       printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
                        break;
                default:
                        printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
@@ -308,7 +271,7 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                        break;
        }
        //Free memory
-       printf("DEBUG -> Freeing...");
+       printf("DEBUG -> Freeing...\n");
        fflush(stdout);
        if (transinfo->objmod != NULL) {
                free(transinfo->objmod);
@@ -325,8 +288,8 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
        return 0;
 }
 
-//This function runs a decision after all objects are weighed under one of the 4 possibilities 
-//and returns the appropriate control message to the Ccordinator 
+//This function runs a decision after all objects involved in TRANS_REQUEST 
+//and returns the appropriate control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT to the Ccordinator 
 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
        int val;
        short version;
@@ -339,6 +302,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
        oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
        // Counters and arrays to formulate decision on control message to be sent
+       // version match or  no match
        int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
        int objmodnotfound = 0, nummodfound = 0;
        void *mobj;
@@ -346,9 +310,9 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        
        //Process each object present in the pile 
        ptr = modptr;
-       //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
-       fflush(stdout);
-       //Process each oid in the machine pile/ group
+       
+       //Process each oid in the machine pile/ group per thread
+       //Should be a new function
        for (i = 0; i < fixed->numread + fixed->nummod; i++) {
                if (i < fixed->numread) {//Object is read
                        int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
@@ -373,6 +337,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                        //Check if obj is locked
                        if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
                                if (version == ((objheader_t *)mobj)->version) {      // If version match
+                                       printf("DEBUG -> obj = %d locked\n", ((objheader_t *)mobj)->oid);
                                        v_matchlock++;
                                } else {//If versions don't match ..HARD ABORT
                                        v_nomatch++;
@@ -382,16 +347,16 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                                perror("Error in sending control to the Coordinator\n");
                                                return 0;
                                        }
-                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
+                                       printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
                                        return control;
                                }
                        } else {//Obj is not locked , so lock object
                                ((objheader_t *)mobj)->status |= LOCK;
-                               //FOR TESTING
+                               // TESTING Add sleep to make transactions run for a long time such that 
+                               // we can test for soft abort case
                                sleep(1);
                                //Save all object oids that are locked on this machine during this transaction request call
                                oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
-                               printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
                                objlocked++;
                                if (version == ((objheader_t *)mobj)->version) { //If versions match
                                        v_matchnolock++;
@@ -409,16 +374,9 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                        }
                }
        }
-
-       printf("No of objs locked = %d\n", objlocked);
-       printf("No of v_nomatch = %d\n", v_nomatch);
-       printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
-       printf("No of objs v_match but had locks before = %d\n", v_matchlock);
-       printf("No of objs not found = %d\n", objnotfound);
-       printf("No of objs modified but not found = %d\n", objmodnotfound);
-
+       
        //Decide what control message(s) to send
-       //Cond to send TRANS_AGREE
+       // Should be a new function
        if(v_matchnolock == fixed->numread + fixed->nummod) {
                //send TRANS_AGREE to Coordinator
                control = TRANS_AGREE;
@@ -430,19 +388,19 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        }
        //Condition to send TRANS_SOFT_ABORT
        if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
-               //send TRANS_SOFT_ABORT to Coordinator
                control = TRANS_SOFT_ABORT;
                char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
-               *((int*)&msg[1])=objnotfound;
+               *((int*)&msg[1])= objnotfound;
 
                printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
-               //send number of oids not found and the missing oids 
+               //Sending control message
                if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
                        perror("Error in sending no of objects that are not found\n");
                        return 0;
                }
+               //send number of oids not found and the missing oids if objects are missing in the machine
                if(objnotfound != 0) { 
-                 int size=sizeof(unsigned int)*objnotfound;
+                 int size = sizeof(unsigned int)*objnotfound;
                  if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
                                perror("Error in sending objects that are not found\n");
                                return 0;
@@ -478,7 +436,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        return control;
 }
 
-//Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
+//Process oids in the TRANS_COMMIT requested by the participant and sends an ACK back to Coordinator
 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        objheader_t *header;
        int i = 0, offset = 0;
@@ -486,7 +444,7 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        //Process each modified object saved in the mainobject store
        for(i=0; i<transinfo->nummod; i++) {
                if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
-                       printf("mhashserach returns NULL\n");
+                       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
                }
                //change reference count of older address and free space in objstr ??
                header->rcount = 1; //Not sure what would be th val
@@ -509,8 +467,7 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
 
        //send ack to coordinator
        control = TRANS_SUCESSFUL;
-       //FOR TESTING
-       printf("DEBUG-> Transaction is SUCCESSFUL \n");
+       printf("DEBUG-> TRANS_SUCESSFUL\n");
        if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
                perror("Error sending ACK to coordinator\n");
        }
index e15e89bb86049c7dc46a501304e3b7f4ba9f278f..d959f49c2a110d1fa69021f25b0fe3c8b022d4c0 100644 (file)
@@ -51,7 +51,6 @@ int checkServer(int mid, char *machineip) {
        while (connect(tmpsd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
                sleep(1);
        }
-       printf("DEBUG -> Connection established with %s\n", machineip);
        close(tmpsd);
        return 0;
 }
index c8cd19c38cab448ad028655e6296fbb4cd08e5b7..61e397db1434aaf69010b4cef593bfe08f679ede 100644 (file)
@@ -45,7 +45,7 @@ void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \
 int main()
 {
        //test1();
-       //test3();
+//     test3();
        test4();
 }
 
@@ -184,24 +184,23 @@ int test3() {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+       mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
        lhashInsert(20, mid);
        lhashInsert(21, mid);
        lhashInsert(22, mid);
 
-       mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
        //Inserting into lhashtable
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
        pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-       //      pthread_create(&thread_Listen, NULL, dstmListen, NULL);
 
-       //Check if machine d-1 is up and running
-       checkServer(mid, "128.200.9.26");
-       mid = iptoMid("128.200.9.27");
-       //Check if machine d-2 is up and running
-       checkServer(mid, "128.200.9.27");
+       //Check if machine d-4 is up and running
+       checkServer(mid, "128.200.9.30");
+       mid = iptoMid("128.200.9.29");
+       //Check if machine d-3 is up and running
+       checkServer(mid, "128.200.9.29");
 
        // Start Transaction    
        myTrans = transStart();
@@ -277,46 +276,45 @@ int test4() {
        lhashInsert(header->oid, mid);
 
        //Inserting into lhashtable
-       mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+       mid = iptoMid("128.200.9.29"); //d-3.eecs.uci.edu
        lhashInsert(20, mid);
        lhashInsert(21, mid);
        lhashInsert(22, mid);
 
-       mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+       mid = iptoMid("128.200.9.30"); //d-4.eecs.uci.edu
        //Inserting into lhashtable
        lhashInsert(31, mid);
        lhashInsert(32, mid);
        lhashInsert(33, mid);
-       pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-       //      pthread_create(&thread_Listen, NULL, dstmListen, NULL);
 
-       //Check if machine d-1 is up and running
-       checkServer(mid, "128.200.9.26");
-       mid = iptoMid("128.200.9.27");
-       //Check if machine d-2 is up and running
-       checkServer(mid, "128.200.9.27");
+       pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+       //Check if machine d-4 is up and running
+       checkServer(mid, "128.200.9.30");
+       mid = iptoMid("128.200.9.29");
+       //Check if machine d-3 is up and running
+       checkServer(mid, "128.200.9.29");
 
        // Start Transaction    
        myTrans = transStart();
 
        //read object 1(present in local machine)
-       if((h1 = transRead(myTrans, 1)) == NULL){
+       if((h1 = transRead(myTrans, 2)) == NULL){
                printf("Object not found\n");
        }
+
        //read object 2present in local machine)
-       if((h2 = transRead(myTrans, 2)) == NULL) {
+       if((h2 = transRead(myTrans, 1)) == NULL) {
                printf("Object not found\n");
        }
-       //read object 31(present in d-1 machine)
+       //read object 31(present in d-4 machine)
        if((h3 = transRead(myTrans, 31)) == NULL) {
                printf("Object not found\n");
        }
-       
-       //read object 21(present in d-2 machine)
+       //read object 21(present in d-3 machine)
        if((h4 = transRead(myTrans, 21)) == NULL) {
                printf("Object not found\n");
        }
-
+       
        // Commit transaction
        transCommit(myTrans);
 
@@ -350,7 +348,6 @@ int test5() {
        lhashInsert(32, mid);
        lhashInsert(33, mid);
        pthread_create(&thread_Listen, &attr, dstmListen, NULL);
-//     pthread_create(&thread_Listen, NULL, dstmListen, NULL);
 
        printf("DEBUG -> mid = %d\n", mid);
        checkServer(mid, "128.200.9.26");
index 0af91b9dae288dc1727ebed7533abc1856fcc982..e9daecf759fbdb258f56d6c4c25ff1823aa8edbf 100644 (file)
@@ -9,6 +9,9 @@
 #include<sys/socket.h>
 #include<netdb.h>
 #include<netinet/in.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <time.h>
 
 #define LISTEN_PORT 2156
 #define MACHINE_IP "127.0.0.1"
 
 extern int classsize[];
 
+void randomdelay(void)
+{
+       struct timespec req, rem;
+       time_t t;
+
+       t = time(NULL);
+       req.tv_sec = 0;
+       req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+       nanosleep(&req, &rem);
+       return;
+}
+
 transrecord_t *transStart()
 {
        transrecord_t *tmp = malloc(sizeof(transrecord_t));
@@ -26,7 +41,6 @@ transrecord_t *transStart()
 
 objheader_t *transRead(transrecord_t *record, unsigned int oid)
 {      
-       printf("Enter TRANS_READ\n");
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
@@ -34,7 +48,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        void *buf;
                //check cache
        if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
-               printf("DEBUG -> transRead oid %d found local\n", oid);
+               //printf("DEBUG -> transRead oid %d found local\n", oid);
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
@@ -55,11 +69,11 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
                        //If object is not found in Remote location
-                       printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
+                       //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
                        return NULL;
                }
                else {
-                       printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
+                       //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
                        return(objcopy);
                }
        } 
@@ -77,127 +91,105 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        chashInsert(record->lookupTable, tmp->oid, tmp);
        return tmp;
 }
-//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
-int decideResponse(thread_data_array_t *tdata, int sd, int val) {
-       int i, n, N, sum, retval, oidcount = 0;
-       int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
-       char ctrl, control, *ptr;
-       unsigned int *oidnotfound;
-       objheader_t *header;
 
-       printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
+//This function decides the reponse that needs to be sent to all other machines involved in a 
+//transaction by the machine that initiated the transaction request
+
+int decideResponse(thread_data_array_t *tdata) {
+       char control;
+       int i, transagree = 0, transdisagree = 0, transsoftabort = 0;
+
        //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
                //Switch case
                control = tdata->recvmsg[i].rcv_status;
                switch(control) {
                        case TRANS_DISAGREE:
-                               printf("DEBUG-> Inside TRANS_DISAGREE\n");
+                               printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
                                transdisagree++;
-                               //Free transaction records
-                               objstrDelete(tdata->rec->cache);
-                               chashDelete(tdata->rec->lookupTable);
-                               free(tdata->rec);
-                               //send Abort
-                               ctrl = TRANS_ABORT;
-                               for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
-                                       if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
-                                               perror("Error sending ctrl message for participant\n");
-                                               return 1;
-                                       }
-                               }
-                               return 0;
+                               break;
 
                        case TRANS_AGREE:
-                               printf("DEBUG-> Inside TRANS_AGREE\n");
-                               PRINT_TID(tdata);
+                               printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
                                transagree++;
                                break;
                                
                        case TRANS_SOFT_ABORT:
-                               printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+                               printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
                                transsoftabort++;
-                               /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
-                               if ((i == tdata->thread_id) && (val == 0)) {
-                                       //Read list of objects missing
-                                       if(read(sd, &oidcount, sizeof(int)) != 0) {
-                                               //Break if only objs are locked at the Participant side
-                                               if (oidcount == 0) {
-                                                       break;
-                                               }
-                                               transsoftabortmiss++;
-                                               N = oidcount * sizeof(unsigned int);
-                                               if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
-                                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-                                               }
-                                               ptr = (char *) oidnotfound;
-                                               do {
-                                                       n = read(sd, ptr+sum, N-sum);
-                                                       sum += n;
-                                               } while(sum < N && n !=0);
-                                       }
-                               }
-
                                break;
                        default:
-                               printf("Participant sent unknown message\n");
+                               printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+                               return -1;
                }
        }
        
        //Decide what control message to send to Participant    
-       if(transagree == tdata->pilecount){
+       if(transdisagree > 0) {
+               //Send Abort
+               *(tdata->replyctrl) = TRANS_ABORT;
+               printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+               objstrDelete(tdata->rec->cache);
+               chashDelete(tdata->rec->lookupTable);
+               free(tdata->rec);
+       } else if(transagree == tdata->pilecount){
                //Send Commit
-               ctrl = TRANS_COMMIT;
-               printf("Sending TRANS_COMMIT\n");
-               if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error sending ctrl message for participant\n");
-                       return 1;
-               }
-               //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
+               *(tdata->replyctrl) = TRANS_COMMIT;
+               printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
+               objstrDelete(tdata->rec->cache);
+               chashDelete(tdata->rec->lookupTable);
+               free(tdata->rec);
+       } else if(transsoftabort > 0 && transdisagree == 0) {
+               //Send Abort
+               *(tdata->replyctrl) = TRANS_ABORT;
+               *(tdata->replyretry) = 1;
+               //objstrDelete(tdata->rec->cache);
+               //chashDelete(tdata->rec->lookupTable);
+               //free(tdata->rec);
+               printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+       } else {
+               printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
+               return -1;
        }
+       
+       return 0;
+}
+//This function sends the final response to all threads in their respective socket id 
+char sendResponse(thread_data_array_t *tdata, int sd) {
+       int n, N, sum, oidcount = 0;
+       char *ptr, retval = 0;
+       unsigned int *oidnotfound;
 
-       if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
-               //Send abort but retry commit
-               ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
-               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
-               if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error sending ctrl message for participant\n");
-                       return 1;
-               }
-       /*      
-               //Sleep and the resend the request
-               sleep(2);
-               //Read new control message from Participant
-
-               if((n = read(sd, &control, sizeof(char))) <= 0) {
-                       perror("No bytes are read for participant\n");
-                       return 1;
+       //If the decided response is due to a soft abort and missing objects at the Participant's side
+       if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
+               //Read list of objects missing
+               if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+                       //Break if only objs are locked at the Participant side
+                       N = oidcount * sizeof(unsigned int);
+                       if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       }
+                       ptr = (char *) oidnotfound;
+                       do {
+                               n = read(sd, ptr+sum, N-sum);
+                               sum += n;
+                       } while(sum < N && n !=0);
                }
-               
-               //Update common data structure and increment count
-               tdata->recvmsg[tdata->thread_id].rcv_status = control;
-               val = 1;
-               decideResponse(tdata, sd, val);         //Second call to decideResponse(); indicated by parameter val = 1
-       */      
+               retval =  TRANS_SOFT_ABORT;
        }
-
-       if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
-               //Send abort but retry commit after relooking up objects
-               ctrl = TRANS_ABORT;
-               printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
-               if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-                       perror("Error sending ctrl message for participant\n");
-                       return 1;
-               }
-               //TODO
-               //Relook up objects
-               //update location table
-               
-               //Free pointers
-               free(oidnotfound);
+       //If the decided response is TRANS_ABORT
+       if(*(tdata->replyctrl) == TRANS_ABORT) {
+               retval = TRANS_ABORT;
        }
-       
-       return 0;
+       if(*(tdata->replyctrl) == TRANS_COMMIT) {
+               retval = TRANS_COMMIT;
+       }
+       // Send response to the Participant
+       if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
+               perror("Error sending ctrl message for participant\n");
+       }
+
+       return retval;
 }
 
 void *transRequest(void *threadarg) {
@@ -206,54 +198,46 @@ void *transRequest(void *threadarg) {
        struct hostent *server;
        thread_data_array_t *tdata;
        objheader_t *headeraddr;
-       //unsigned int *oidnotfound;
        char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
-       char machineip[16];
+       char machineip[16], retval;
 
        tdata = (thread_data_array_t *) threadarg;
        //Send Trans Request
        if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               perror("Error in socket for TRANS_REQUEST");
+               perror("Error in socket for TRANS_REQUEST\n");
                return NULL;
        }
        bzero((char*) &serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(LISTEN_PORT);
-       //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
        midtoIP(tdata->mid,machineip);
        machineip[15] = '\0';
        serv_addr.sin_addr.s_addr = inet_addr(machineip);
 
        if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-               perror("Error in connect for TRANS_REQUEST");
+               perror("Error in connect for TRANS_REQUEST\n");
                return NULL;
        }
-
-       //Multiple writes for sending packets of data 
-       //Send first few fixed bytes of the TRANS_REQUEST protocol
-       printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
-//     printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
-//     printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
+       
+       printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
+       //Send bytes of data with TRANS_REQUEST control message
        if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
-               perror("Error sending fixed bytes for thread");
+               perror("Error sending fixed bytes for thread\n");
                return NULL;
        }
        //Send list of machines involved in the transaction
-//     printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
        {
          int size=sizeof(unsigned int)*tdata->pilecount;
          if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
-           perror("Error sending list of machines for thread");
+           perror("Error sending list of machines for thread\n");
            return NULL;
          }
        }
        //Send oids and version number tuples for objects that are read
-//     printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-//     printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
        {
          int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
          if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
-           perror("Error sending tuples for thread");
+           perror("Error sending tuples for thread\n");
            return NULL;
          }
        }
@@ -263,11 +247,11 @@ void *transRequest(void *threadarg) {
          headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
          size=sizeof(objheader_t)+classsize[headeraddr->type];
          if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
-           perror("Error sending obj modified for thread");
+           perror("Error sending obj modified for thread\n");
            return NULL;
          }
        }
-       
+
        //Read message  control message from participant side
        if((n = read(sd, &control, sizeof(char))) <= 0) {
                perror("Error in reading control message from Participant\n");
@@ -280,22 +264,29 @@ void *transRequest(void *threadarg) {
        //Lock and update count
        //Thread sleeps until all messages from pariticipants are received by coordinator
        pthread_mutex_lock(tdata->lock);
-               (*(tdata->count))++;
+
+       (*(tdata->count))++;
        
        if(*(tdata->count) == tdata->pilecount) {
+               if (decideResponse(tdata) != 0) { 
+                       printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+                       pthread_mutex_unlock(tdata->lock);
+                       close(sd);
+                       return NULL;
+               }
                pthread_cond_broadcast(tdata->threshold);
        } else {
                pthread_cond_wait(tdata->threshold, tdata->lock);
        }       
 
-       //process the participant's request
-       if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
-               printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+       pthread_mutex_unlock(tdata->lock);
+       
+       if (sendResponse(tdata, sd) == 0) { 
+               printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
                pthread_mutex_unlock(tdata->lock);
+               close(sd);
                return NULL;
        }
-       pthread_mutex_unlock(tdata->lock);
-
        close(sd);
        pthread_exit(NULL);
 }
@@ -312,6 +303,7 @@ int transCommit(transrecord_t *record) {
        char transid[TID_LEN];
        static int newtid = 0;
        trans_req_data_t *tosend;
+       char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent
 
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
@@ -328,14 +320,12 @@ int transCommit(transrecord_t *record) {
                        //Get machine location for object id
                        
                        if ((machinenum = lhashSearch(curr->key)) == 0) {
-                              printf("Error: No such machine\n");
+                              printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
                               return 1;
                        }
                                        
-                       //TODO only for debug
-                       //machinenum = 1;
                        if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
-                               printf("Error: No such oid\n");
+                               printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
                                return 1;
                        }
                        //Make machine groups
@@ -378,6 +368,7 @@ int transCommit(transrecord_t *record) {
                                
        pListMid(pile, listmid);
        //Process each machine group
+       //Should be a new function for while loop
        while(tmp != NULL) {
                //Create transaction id
                newtid++;
@@ -403,6 +394,8 @@ int transCommit(transrecord_t *record) {
                thread_data_array[numthreads].threshold = &tcond;
                thread_data_array[numthreads].lock = &tlock;
                thread_data_array[numthreads].count = &trecvcount;
+               thread_data_array[numthreads].replyctrl = &treplyctrl;
+               thread_data_array[numthreads].replyretry = &treplyretry;
                thread_data_array[numthreads].rec = record;
 
                rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
@@ -410,10 +403,8 @@ int transCommit(transrecord_t *record) {
                        perror("Error in pthread create");
                        return 1;
                }               
-
                numthreads++;           
                //TODO frees 
-               //free(tosend);
                tmp = tmp->next;
        }
 
@@ -431,11 +422,18 @@ int transCommit(transrecord_t *record) {
        //Free resources        
        pthread_cond_destroy(&tcond);
        pthread_mutex_destroy(&tlock);
-//     for(i = 0 ;i< pilecount ;i++) {
-               free(tosend);
-//     }
+
+       free(tosend);
        free(listmid);
        pDelete(pile);
+       if(treplyretry == 1) {
+               //wait a random amount of time
+               randomdelay();
+               //sleep(1);
+               //Retry the commiting transaction again
+               transCommit(record);
+       }       
+       
        return 0;
 }
 
@@ -499,7 +497,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                        chashInsert(record->lookupTable, oid, objcopy); 
                        break;
                default:
-                       printf("Error in recv request from participant on a READ_REQUEST\n");
+                       printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
                        return NULL;
        }
        close(sd);