From 240ee07dc610ac00b42ecd68928861303ed2df5a Mon Sep 17 00:00:00 2001 From: adash Date: Mon, 7 May 2007 08:45:12 +0000 Subject: [PATCH] Fixed bugs for memory leak . Trying to run two transactions commit at the same time. TODO: Fix another bug that causes trans to commit when it should be a soft abort. --- Robust/src/Runtime/DSTM/interface/Makefile | 19 ++- .../src/Runtime/DSTM/interface/dstmserver.c | 39 +++-- Robust/src/Runtime/DSTM/interface/ip.c | 33 ++++ Robust/src/Runtime/DSTM/interface/ip.h | 1 + .../src/Runtime/DSTM/interface/testclient.c | 95 ++++++++++- .../src/Runtime/DSTM/interface/testserver.c | 160 +++++++++++++++++- Robust/src/Runtime/DSTM/interface/trans.c | 6 +- 7 files changed, 327 insertions(+), 26 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index 7b693293..ee831a54 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,11 +1,16 @@ -client: - gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c +d-2: + gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + +demksy: + gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + +d-1: + gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c -server: - gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c all: - gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c - gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c + gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c + gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c clean: - rm client server + rm d-2 d-1 demsky diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 5d292c9d..56200e78 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -160,11 +160,9 @@ void *dstmAccept(void *acceptfd) else printf("Closed connection: fd = %d\n", (int)acceptfd); - //Free memory - free(transinfo.objmod); - free(transinfo.objlocked); - free(transinfo.objnotfound); + pthread_exit(NULL); + printf("DEBUG -> Exiting dstmAccept\n"); } int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { @@ -323,7 +321,21 @@ int readClientReq(int acceptfd, trans_commit_data_t *transinfo) { //TODO Use fixed.trans_id TID since Client may have died break; } - + //Free memory + printf("DEBUG -> Freeing..."); + fflush(stdout); + if (transinfo->objmod != NULL) { + free(transinfo->objmod); + transinfo->objmod = NULL; + } + if (transinfo->objlocked != NULL) { + free(transinfo->objlocked); + transinfo->objlocked = NULL; + } + if (transinfo->objnotfound != NULL) { + free(transinfo->objnotfound); + transinfo->objnotfound = NULL; + } return 0; } @@ -340,7 +352,6 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int)); - // Counters and arrays to formulate decision on control message to be sent int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; int objmodnotfound = 0, nummodfound = 0; @@ -392,6 +403,7 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran ((objheader_t *)mobj)->status |= LOCK; //Save all object oids that are locked on this machine during this transaction request call oidlocked[objlocked] = ((objheader_t *)mobj)->oid; + printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid); objlocked++; if (version == ((objheader_t *)mobj)->version) { //If versions match v_matchnolock++; @@ -410,12 +422,12 @@ char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *tran } } - //printf("No of objs locked = %d\n", objlocked); - //printf("No of v_nomatch = %d\n", v_nomatch); - //printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock); - //printf("No of objs v_match but had locks before = %d\n", v_matchlock); - //printf("No of objs not found = %d\n", objnotfound); - //printf("No of objs modified but not found = %d\n", objmodnotfound); + printf("No of objs locked = %d\n", objlocked); + printf("No of v_nomatch = %d\n", v_nomatch); + printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock); + printf("No of objs v_match but had locks before = %d\n", v_matchlock); + printf("No of objs not found = %d\n", objnotfound); + printf("No of objs modified but not found = %d\n", objmodnotfound); //Decide what control message(s) to send if(v_matchnolock == fixed->numread + fixed->nummod) { @@ -510,7 +522,8 @@ int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) { if(send((int)acceptfd, &control, sizeof(char), 0) < 0) { perror("Error sending ACK to coordinator\n"); } - + + printf("DEBUG-> Completed the pending transaction\n"); return 0; } diff --git a/Robust/src/Runtime/DSTM/interface/ip.c b/Robust/src/Runtime/DSTM/interface/ip.c index bf1cee58..e15e89bb 100644 --- a/Robust/src/Runtime/DSTM/interface/ip.c +++ b/Robust/src/Runtime/DSTM/interface/ip.c @@ -1,6 +1,13 @@ #include #include #include "ip.h" +#include +#include +#include +#include +#include + +#define LISTEN_PORT 2156 unsigned int iptoMid(char *addr) { ip_t i; @@ -8,6 +15,7 @@ unsigned int iptoMid(char *addr) { sscanf(addr, "%d.%d.%d.%d", &i.a, &i.b, &i.c, &i.d); mid = (i.a << 24) | (i.b << 16) | (i.c << 8) | i.d; + fflush(stdout); return mid; } @@ -22,6 +30,31 @@ void midtoIP(unsigned int mid, char *ptr) { return; } +int checkServer(int mid, char *machineip) { + int tmpsd; + struct sockaddr_in serv_addr; + char m[20]; + + strncpy(m, machineip, strlen(machineip)); + // Foreach machine you want to transact with + // check if its up and running + if ((tmpsd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror(""); + return(-1); + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + midtoIP(mid, m); + m[15] = '\0'; + serv_addr.sin_addr.s_addr = inet_addr(m); + while (connect(tmpsd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + sleep(1); + } + printf("DEBUG -> Connection established with %s\n", machineip); + close(tmpsd); + return 0; +} /* main() { unsigned int mid; diff --git a/Robust/src/Runtime/DSTM/interface/ip.h b/Robust/src/Runtime/DSTM/interface/ip.h index 3d9237b0..4dcc5f76 100644 --- a/Robust/src/Runtime/DSTM/interface/ip.h +++ b/Robust/src/Runtime/DSTM/interface/ip.h @@ -10,5 +10,6 @@ typedef struct ip { unsigned int iptoMid(char *); void midtoIP(unsigned int, char *); +int checkServer(int, char *); #endif diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index 688d44a3..b27456a9 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -6,6 +6,7 @@ //#include //#include //#include +#define LISTEN_PORT 2156 extern objstr_t *mainobjstore; //extern lhashtable_t llookup; //Global Hash table @@ -79,7 +80,9 @@ int main() // test2(); // test3(); // test4(); - test5(); + //test5(); + test6(); + } int test1(void) { @@ -266,3 +269,93 @@ int test5(void) { transCommit(record); pthread_join(thread_Listen, NULL); } + +int test6(void) { + transrecord_t *record; + objheader_t *header; + unsigned int size, mid; + pthread_t thread_Listen; + pthread_attr_t attr; + objheader_t *h1,*h2, *h3, *h4, *h5, *h6; + int tmpsd; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + record = transStart(); + //printf("DEBUG -> Init done\n"); + mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu + lhashInsert(1,mid); + lhashInsert(2,mid); + lhashInsert(3,mid); + lhashInsert(4,mid); + lhashInsert(5,mid); + lhashInsert(6,mid); + + mid = iptoMid("128.200.9.26");// Machine demsky.eecs.uci.edu + lhashInsert(31,mid); + lhashInsert(32,mid); + lhashInsert(33,mid); + + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + + checkServer(mid, "128.200.9.26"); + mid = iptoMid("128.200.9.10"); + checkServer(mid, "128.200.9.10"); + + //Create and Insert Oid 20 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 20; + header->type = 2; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + + //Create and Insert Oid 21 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + header->oid = 21; + header->type = 1; + header->version = 1; + header->rcount = 0; //? not sure how to handle this yet + header->status = 0; + header->status |= NEW; + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.27"); + lhashInsert(header->oid, mid); + sleep(3); + //read object 1 //from demsky + if((h1 = transRead(record, 1)) == NULL){ + printf("Object not found\n"); + } + //read object 2 + if((h2 = transRead(record, 2)) == NULL) { + printf("Object not found\n"); + } + //read object 31 //Found in d-1 + if((h2 = transRead(record, 31)) == NULL) { + printf("Object not found\n"); + } + //read object 32 //Found in d-1 + if((h2 = transRead(record, 32)) == NULL) { + printf("Object not found\n"); + } + //read object 20(present in local machine) + if((h3 = transRead(record, 20)) == NULL) { + printf("Object not found\n"); + } + //read object 21(present in local machine) + if((h4 = transRead(record, 21)) == NULL) { + printf("Object not found\n"); + } + transCommit(record); + pthread_join(thread_Listen, NULL); + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/testserver.c b/Robust/src/Runtime/DSTM/interface/testserver.c index 46f17bb1..ace2a94f 100644 --- a/Robust/src/Runtime/DSTM/interface/testserver.c +++ b/Robust/src/Runtime/DSTM/interface/testserver.c @@ -3,18 +3,21 @@ #include #include #include +#include "ip.h" extern objstr_t *mainobjstore; int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)}; int test1(void); int test2(void); +int test3(void); unsigned int createObjects(transrecord_t *record, unsigned short type) { objheader_t *header, *tmp; struct sockaddr_in antelope; unsigned int size, mid; size = sizeof(objheader_t) + classsize[type] ; + //Inserts in chashtable header = transCreateObj(record, type); tmp = (objheader_t *) objstrAlloc(mainobjstore, size); memcpy(tmp, header, size); @@ -27,9 +30,22 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) { return 0; } +void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \ + unsigned short version,\ + unsigned short rcount, char status) { + h->oid = oid; + h->type = type; + h->version = version; + h->rcount = rcount; + h->status |= status; + return; +} + + int main() { - test2(); + //sleep(3); + test3(); } int test1() @@ -79,7 +95,7 @@ int test2() { pthread_t thread_Listen; dstmInit(); - mid = iptoMid("128.200.9.27"); + mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu //Inserting into lhashtable lhashInsert(20, mid); lhashInsert(21, mid); @@ -119,3 +135,143 @@ int test2() { } pthread_join(thread_Listen, NULL); } + +int test3() { + + unsigned int val, mid; + transrecord_t *myTrans; + unsigned int size; + objheader_t *header; + pthread_t thread_Listen; + pthread_attr_t attr; + objheader_t *h1, *h2, *h3, *h4, *h5; + + dstmInit(); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(20, mid); + lhashInsert(21, mid); + lhashInsert(22, mid); + + mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu + //Inserting into lhashtable + lhashInsert(31, mid); + lhashInsert(32, mid); + lhashInsert(33, mid); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); +// pthread_create(&thread_Listen, NULL, dstmListen, NULL); + + printf("DEBUG -> mid = %d\n", mid); + checkServer(mid, "128.200.9.26"); + mid = iptoMid("128.200.9.27"); + printf("DEBUG -> mid = %d\n", mid); + checkServer(mid, "128.200.9.27"); + + // Start Transaction + myTrans = transStart(); +/* + //Create Object1 + if((val = createObjects(myTrans, 0)) != 0) { + printf("Error transCreateObj1"); + } + //Create Object2 + if((val = createObjects(myTrans, 1)) != 0) { + printf("Error transCreateObj2"); + } + //Create Object3 + if((val = createObjects(myTrans, 2)) != 0) { + printf("Error transCreateObj3"); + } + //Create Object4 + if((val = createObjects(myTrans, 3)) != 0) { + printf("Error transCreateObj4"); + } + //Create Object5 + if((val = createObjects(myTrans, 0)) != 0) { + printf("Error transCreateObj5"); + } + //Create Object6 + if((val = createObjects(myTrans, 1)) != 0) { + printf("Error transCreateObj6"); + } + + */ + // Create and Insert Oid 1 + size = sizeof(objheader_t) + classsize[0] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 1, 0, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 2 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 2, 1, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + + // Create and Insert Oid 3 + size = sizeof(objheader_t) + classsize[2] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 3, 2, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 4 + size = sizeof(objheader_t) + classsize[3] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 4, 3, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 5 + size = sizeof(objheader_t) + classsize[0] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 5, 0, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + // Create and Insert Oid 6 + size = sizeof(objheader_t) + classsize[1] ; + header = (objheader_t *) objstrAlloc(mainobjstore, size); + init_obj(header, 6, 1, 1, 0, NEW); + mhashInsert(header->oid, header); + mid = iptoMid("128.200.9.10"); + lhashInsert(header->oid, mid); + + //read object 1(present in local machine) + if((h1 = transRead(myTrans, 1)) == NULL){ + printf("Object not found\n"); + } + //read object 2present in local machine) + if((h2 = transRead(myTrans, 2)) == NULL) { + printf("Object not found\n"); + } + //read object 3(present in local machine) + if((h3 = transRead(myTrans, 3)) == NULL) { + printf("Object not found\n"); + } + //read object 31 (present in d-1. eecs) + if((h4 = transRead(myTrans, 31)) == NULL) { + printf("Object not found\n"); + } + //read object 20 (present in d-2. eecs) + if((h5 = transRead(myTrans, 20)) == NULL) { + printf("Object not found\n"); + } + + transCommit(myTrans); + + pthread_join(thread_Listen, NULL); + + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 369b3453..24f03c63 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -38,7 +38,8 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { //Look up in Machine lookup table and found - printf("oid is found in Local mlookup\n"); + + //printf("oid is found in Local machinelookup\n"); tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; //Copy into cache @@ -49,7 +50,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objcopy); } else { //Get the object from the remote location - printf("oid is found in remote machine\n"); + //printf("oid is found in remote machine\n"); machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { @@ -470,7 +471,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { switch(control) { case OBJECT_NOT_FOUND: return NULL; - break; case OBJECT_FOUND: if((val = read(sd, &size, sizeof(int))) <= 0) { perror("No size is read from the participant\n"); -- 2.34.1