Fixed bugs for memory leak . Trying to run two transactions commit
authoradash <adash>
Mon, 7 May 2007 08:45:12 +0000 (08:45 +0000)
committeradash <adash>
Mon, 7 May 2007 08:45:12 +0000 (08:45 +0000)
at the same time.
TODO: Fix another bug that causes trans to commit when it should be a
soft abort.

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/ip.c
Robust/src/Runtime/DSTM/interface/ip.h
Robust/src/Runtime/DSTM/interface/testclient.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 7b693293e92d3a60e4be73fc6972b1d6acae3227..ee831a546d4812e4c988745facebbe352842d87e 100644 (file)
@@ -1,11 +1,16 @@
-client:
-       gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+d-2:
+       gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+
+demksy:
+       gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+
+d-1:
+       gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
-server:
-       gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
 all:
-       gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
-       gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+       gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+       gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
 
 clean:
-       rm client server
+       rm d-2 d-1 demsky
index 5d292c9d8b2c4f2ec0a7eb91b42e2257d1bbbaf6..56200e78087bab293b169ac8328758bcad32e919 100644 (file)
@@ -160,11 +160,9 @@ void *dstmAccept(void *acceptfd)
        else
                printf("Closed connection: fd = %d\n", (int)acceptfd);
        
-       //Free memory
-       free(transinfo.objmod);
-       free(transinfo.objlocked);
-       free(transinfo.objnotfound);
+       
        pthread_exit(NULL);
+       printf("DEBUG -> Exiting dstmAccept\n");
 }
 
 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
@@ -323,7 +321,21 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
                        //TODO Use fixed.trans_id  TID since Client may have died
                        break;
        }
-
+       //Free memory
+       printf("DEBUG -> Freeing...");
+       fflush(stdout);
+       if (transinfo->objmod != NULL) {
+               free(transinfo->objmod);
+               transinfo->objmod = NULL;
+       }
+       if (transinfo->objlocked != NULL) {
+               free(transinfo->objlocked);
+               transinfo->objlocked = NULL;
+       }
+       if (transinfo->objnotfound != NULL) {
+               free(transinfo->objnotfound);
+               transinfo->objnotfound = NULL;
+       }
        return 0;
 }
 
@@ -340,7 +352,6 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
        oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
        oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
        oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
-
        // Counters and arrays to formulate decision on control message to be sent
        int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
        int objmodnotfound = 0, nummodfound = 0;
@@ -392,6 +403,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                                ((objheader_t *)mobj)->status |= LOCK;
                                //Save all object oids that are locked on this machine during this transaction request call
                                oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+                               printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
                                objlocked++;
                                if (version == ((objheader_t *)mobj)->version) { //If versions match
                                        v_matchnolock++;
@@ -410,12 +422,12 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran
                }
        }
 
-       //printf("No of objs locked = %d\n", objlocked);
-       //printf("No of v_nomatch = %d\n", v_nomatch);
-       //printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
-       //printf("No of objs v_match but had locks before = %d\n", v_matchlock);
-       //printf("No of objs not found = %d\n", objnotfound);
-       //printf("No of objs modified but not found = %d\n", objmodnotfound);
+       printf("No of objs locked = %d\n", objlocked);
+       printf("No of v_nomatch = %d\n", v_nomatch);
+       printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
+       printf("No of objs v_match but had locks before = %d\n", v_matchlock);
+       printf("No of objs not found = %d\n", objnotfound);
+       printf("No of objs modified but not found = %d\n", objmodnotfound);
 
        //Decide what control message(s) to send
        if(v_matchnolock == fixed->numread + fixed->nummod) {
@@ -510,7 +522,8 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
        if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
                perror("Error sending ACK to coordinator\n");
        }
-
+       
+       printf("DEBUG-> Completed the pending transaction\n");
        return 0;
 }
 
index bf1cee58d1892998854d47e22e55dc5c8db225b9..e15e89bb86049c7dc46a501304e3b7f4ba9f278f 100644 (file)
@@ -1,6 +1,13 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include "ip.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <string.h>
+
+#define LISTEN_PORT 2156
 
 unsigned int iptoMid(char *addr) {
        ip_t i;
@@ -8,6 +15,7 @@ unsigned int iptoMid(char *addr) {
 
        sscanf(addr, "%d.%d.%d.%d", &i.a, &i.b, &i.c, &i.d);
        mid = (i.a << 24) | (i.b << 16) | (i.c << 8) | i.d;
+       fflush(stdout);
        return mid;
 }
 
@@ -22,6 +30,31 @@ void midtoIP(unsigned int mid, char *ptr) {
        return;
 }
 
+int checkServer(int mid, char *machineip) {
+       int tmpsd;
+       struct sockaddr_in serv_addr;
+       char m[20];
+
+       strncpy(m, machineip, strlen(machineip));
+       // Foreach machine you want to transact with
+       // check if its up and running
+       if ((tmpsd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               perror("");
+               return(-1);
+       }
+       bzero((char*) &serv_addr, sizeof(serv_addr));
+       serv_addr.sin_family = AF_INET;
+       serv_addr.sin_port = htons(LISTEN_PORT);
+       midtoIP(mid, m);
+       m[15] = '\0';
+       serv_addr.sin_addr.s_addr = inet_addr(m);
+       while (connect(tmpsd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+               sleep(1);
+       }
+       printf("DEBUG -> Connection established with %s\n", machineip);
+       close(tmpsd);
+       return 0;
+}
 /*
 main() {
        unsigned int mid;
index 3d9237b0f3a2ab301c2f4457afde83273b3b2be1..4dcc5f76f21c65c48040cab84d79646158087cfb 100644 (file)
@@ -10,5 +10,6 @@ typedef struct ip {
 
 unsigned int iptoMid(char *);
 void midtoIP(unsigned int, char *);
+int checkServer(int, char *);
 
 #endif
index 688d44a3bfa76a6f393e0b9799874398fb5f37bf..b27456a9f53d45739c5a52ed92389c9984b45348 100644 (file)
@@ -6,6 +6,7 @@
 //#include <sys/socket.h>
 //#include <netinet/in.h>
 //#include <arpa/inet.h>
+#define LISTEN_PORT 2156
 
 extern objstr_t *mainobjstore;
 //extern lhashtable_t llookup;         //Global Hash table
@@ -79,7 +80,9 @@ int main()
 //     test2();
 //     test3();
 //     test4();
-       test5();
+       //test5();
+       test6();
+
 }
 
 int test1(void) {
@@ -266,3 +269,93 @@ int test5(void) {
        transCommit(record);
        pthread_join(thread_Listen, NULL);
 }
+
+int test6(void) {
+       transrecord_t *record;
+       objheader_t *header;
+       unsigned int size, mid;
+       pthread_t thread_Listen;
+       pthread_attr_t attr;
+       objheader_t *h1,*h2, *h3, *h4, *h5, *h6;
+       int tmpsd;
+
+       dstmInit();
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+       //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+       record = transStart();
+       //printf("DEBUG -> Init done\n");
+       mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu    
+       lhashInsert(1,mid);
+       lhashInsert(2,mid);
+       lhashInsert(3,mid);
+       lhashInsert(4,mid);
+       lhashInsert(5,mid);
+       lhashInsert(6,mid);
+       
+       mid = iptoMid("128.200.9.26");// Machine demsky.eecs.uci.edu    
+       lhashInsert(31,mid);
+       lhashInsert(32,mid);
+       lhashInsert(33,mid);
+       
+       pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+       
+       checkServer(mid, "128.200.9.26");
+       mid = iptoMid("128.200.9.10");
+       checkServer(mid, "128.200.9.10");
+
+       //Create and Insert Oid 20
+       size = sizeof(objheader_t) + classsize[2] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       header->oid = 20;
+       header->type = 2;
+       header->version = 1;
+       header->rcount = 0; //? not sure how to handle this yet
+       header->status = 0;
+       header->status |= NEW;
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.27");
+       lhashInsert(header->oid, mid);
+
+       //Create and Insert Oid 21
+       size = sizeof(objheader_t) + classsize[1] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       header->oid = 21;
+       header->type = 1;
+       header->version = 1;
+       header->rcount = 0; //? not sure how to handle this yet
+       header->status = 0;
+       header->status |= NEW;
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.27");
+       lhashInsert(header->oid, mid);
+       sleep(3);       
+       //read object 1  //from demsky
+       if((h1 = transRead(record, 1)) == NULL){
+               printf("Object not found\n");
+       }
+       //read object 2
+       if((h2 = transRead(record, 2)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 31 //Found in d-1
+       if((h2 = transRead(record, 31)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 32 //Found in d-1
+       if((h2 = transRead(record, 32)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 20(present in local machine)
+       if((h3 = transRead(record, 20)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 21(present in local machine)
+       if((h4 = transRead(record, 21)) == NULL) {
+               printf("Object not found\n");
+       }
+       transCommit(record);
+       pthread_join(thread_Listen, NULL);
+       return 0;
+}
index 46f17bb14dad2eddf93b5835e426470436615b3a..ace2a94ff0c71424db931992a890890b22f668cc 100644 (file)
@@ -3,18 +3,21 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
+#include "ip.h"
 
 extern objstr_t *mainobjstore;
 int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
 
 int test1(void);
 int test2(void);
+int test3(void);
 
 unsigned int createObjects(transrecord_t *record, unsigned short type) {
        objheader_t *header, *tmp;
        struct sockaddr_in antelope;
        unsigned int size, mid;
        size = sizeof(objheader_t) + classsize[type] ;
+       //Inserts in chashtable
        header = transCreateObj(record, type);
        tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
        memcpy(tmp, header, size);
@@ -27,9 +30,22 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) {
        return 0;
 }
 
+void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \
+               unsigned short version,\
+               unsigned short rcount, char status) {
+       h->oid = oid;
+       h->type = type;
+       h->version = version;
+       h->rcount = rcount;
+       h->status |= status;
+       return;
+}
+
+
 int main()
 {
-       test2();
+       //sleep(3);
+       test3();
 }
 
 int test1()
@@ -79,7 +95,7 @@ int test2() {
        pthread_t thread_Listen;
 
        dstmInit();     
-       mid = iptoMid("128.200.9.27");
+       mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
        //Inserting into lhashtable
        lhashInsert(20, mid);
        lhashInsert(21, mid);
@@ -119,3 +135,143 @@ int test2() {
        }
        pthread_join(thread_Listen, NULL);
 }
+
+int test3() {
+       
+       unsigned int val, mid;
+       transrecord_t *myTrans;
+       unsigned int size;
+       objheader_t *header;
+       pthread_t thread_Listen;
+       pthread_attr_t attr;
+       objheader_t *h1, *h2, *h3, *h4, *h5;
+
+       dstmInit();     
+       pthread_attr_init(&attr);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+       mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+       //Inserting into lhashtable
+       lhashInsert(20, mid);
+       lhashInsert(21, mid);
+       lhashInsert(22, mid);
+
+       mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+       //Inserting into lhashtable
+       lhashInsert(31, mid);
+       lhashInsert(32, mid);
+       lhashInsert(33, mid);
+       pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+//     pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+
+       printf("DEBUG -> mid = %d\n", mid);
+       checkServer(mid, "128.200.9.26");
+       mid = iptoMid("128.200.9.27");
+       printf("DEBUG -> mid = %d\n", mid);
+       checkServer(mid, "128.200.9.27");
+
+       // Start Transaction    
+       myTrans = transStart();
+/*
+       //Create Object1
+       if((val = createObjects(myTrans, 0)) != 0) {
+               printf("Error transCreateObj1");
+       }
+       //Create Object2
+       if((val = createObjects(myTrans, 1)) != 0) {
+               printf("Error transCreateObj2");
+       }
+       //Create Object3
+       if((val = createObjects(myTrans, 2)) != 0) {
+               printf("Error transCreateObj3");
+       }
+       //Create Object4
+       if((val = createObjects(myTrans, 3)) != 0) {
+               printf("Error transCreateObj4");
+       }
+       //Create Object5
+       if((val = createObjects(myTrans, 0)) != 0) {
+               printf("Error transCreateObj5");
+       }
+       //Create Object6
+       if((val = createObjects(myTrans, 1)) != 0) {
+               printf("Error transCreateObj6");
+       }
+
+       */
+       // Create and Insert Oid 1
+       size = sizeof(objheader_t) + classsize[0] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       init_obj(header, 1, 0, 1, 0, NEW);
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.10");
+       lhashInsert(header->oid, mid);
+
+       // Create and Insert Oid 2
+       size = sizeof(objheader_t) + classsize[1] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       init_obj(header, 2, 1, 1, 0, NEW);
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.10");
+       lhashInsert(header->oid, mid);
+
+
+       // Create and Insert Oid 3
+       size = sizeof(objheader_t) + classsize[2] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       init_obj(header, 3, 2, 1, 0, NEW);
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.10");
+       lhashInsert(header->oid, mid);
+
+       // Create and Insert Oid 4
+       size = sizeof(objheader_t) + classsize[3] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       init_obj(header, 4, 3, 1, 0, NEW);
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.10");
+       lhashInsert(header->oid, mid);
+
+       // Create and Insert Oid 5
+       size = sizeof(objheader_t) + classsize[0] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       init_obj(header, 5, 0, 1, 0, NEW);
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.10");
+       lhashInsert(header->oid, mid);
+       
+       // Create and Insert Oid 6
+       size = sizeof(objheader_t) + classsize[1] ;
+       header = (objheader_t *) objstrAlloc(mainobjstore, size);
+       init_obj(header, 6, 1, 1, 0, NEW);
+       mhashInsert(header->oid, header);
+       mid = iptoMid("128.200.9.10");
+       lhashInsert(header->oid, mid);
+       
+       //read object 1(present in local machine)
+       if((h1 = transRead(myTrans, 1)) == NULL){
+               printf("Object not found\n");
+       }
+       //read object 2present in local machine)
+       if((h2 = transRead(myTrans, 2)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 3(present in local machine)
+       if((h3 = transRead(myTrans, 3)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 31 (present in d-1. eecs)
+       if((h4 = transRead(myTrans, 31)) == NULL) {
+               printf("Object not found\n");
+       }
+       //read object 20 (present in d-2. eecs)
+       if((h5 = transRead(myTrans, 20)) == NULL) {
+               printf("Object not found\n");
+       }
+
+       transCommit(myTrans);
+
+       pthread_join(thread_Listen, NULL);
+
+       return 0;
+}
index 369b34537d2b38e1f3ec97e6fc2faa508e54d17e..24f03c639c15de9a053eb1149c6f66426b6c4396 100644 (file)
@@ -38,7 +38,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                //Look up in Machine lookup table and found
-               printf("oid is found in Local mlookup\n");
+
+               //printf("oid is found in Local machinelookup\n");
                tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                //Copy into cache
@@ -49,7 +50,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                return(objcopy);
        } else {
                //Get the object from the remote location
-               printf("oid is found in remote machine\n");
+               //printf("oid is found in remote machine\n");
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL) {
@@ -470,7 +471,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        switch(control) {
                case OBJECT_NOT_FOUND:
                        return NULL;
-                       break;
                case OBJECT_FOUND:
                        if((val = read(sd, &size, sizeof(int))) <= 0) {
                                perror("No size is read from the participant\n");