From: erubow Date: Wed, 12 Sep 2007 02:03:07 +0000 (+0000) Subject: Added code to read config file of IP address, and use these for the mostly fair distr... X-Git-Tag: preEdgeChange~454 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=b2aa4286156f424155a16ca2d94f42ddefd94e57;p=IRC.git Added code to read config file of IP address, and use these for the mostly fair distribution of the OID space. Updated llookup to use this static partitioning for now. --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.c b/Robust/src/Runtime/DSTM/interface/dstm.c index c586dd75..e404925e 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.c +++ b/Robust/src/Runtime/DSTM/interface/dstm.c @@ -4,12 +4,6 @@ extern int classsize[]; /* BEGIN object header */ -// Get a new object id -unsigned int getNewOID(void) { - static int id = 1; - return id+=2; -} - // Get the size of the object for a given type unsigned int objSize(objheader_t *object) { return classsize[TYPE(object)]; diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index ffa30361..59de2485 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -206,6 +206,8 @@ int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int); int dstmStartup(const char *); void transInit(); +int processConfigFile(); +void addHost(unsigned int); void randomdelay(void); transrecord_t *transStart(); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 8aa6e35f..dff96e32 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -105,7 +105,6 @@ void *dstmAccept(void *acceptfd) int fd_flags = fcntl((int)acceptfd, F_GETFD), size; - printf("Recieved connection: fd = %d\n", (int)acceptfd); /* Receive control messages from other machines */ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) { if (retval == 0) { @@ -182,9 +181,9 @@ void *dstmAccept(void *acceptfd) retval); else { //TODO: execute run method on this global thread object - printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid); + printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid); objType = getObjType(oid); - printf("dstmAccept(): type of object %d is %d\n", oid, objType); + printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType); } break; @@ -195,8 +194,6 @@ void *dstmAccept(void *acceptfd) /* Close connection */ if (close((int)acceptfd) == -1) perror("close"); - else - printf("Closed connection: fd = %d\n", (int)acceptfd); pthread_exit(NULL); } diff --git a/Robust/src/Runtime/DSTM/interface/llookup.c b/Robust/src/Runtime/DSTM/interface/llookup.c index cc989a28..30760978 100644 --- a/Robust/src/Runtime/DSTM/interface/llookup.c +++ b/Robust/src/Runtime/DSTM/interface/llookup.c @@ -9,6 +9,36 @@ ***************************************************************************************************/ #include "llookup.h" +#ifdef SIMPLE_LLOOKUP + +extern unsigned int *hostIpAddrs; +extern unsigned int oidsPerBlock; + +unsigned int lhashCreate(unsigned int size, float loadfactor) +{ + return 0; +} + +unsigned int lhashInsert(unsigned int oid, unsigned int mid) +{ + return 0; +} + +unsigned int lhashSearch(unsigned int oid) +{ + if (oidsPerBlock == 0) + return hostIpAddrs[0]; + else + return hostIpAddrs[oid / oidsPerBlock]; +} + +unsigned int lhashRemove(unsigned int oid) +{ + return 0; +} + +#else + lhashtable_t llookup; //Global Hash table // Creates a hash table with size and an array of lhashlistnode_t @@ -193,3 +223,6 @@ unsigned int lhashResize(unsigned int newsize) { free(ptr); //Free the memory of the old hash table return 0; } + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/llookup.h b/Robust/src/Runtime/DSTM/interface/llookup.h index 7a70b9bd..20dbac72 100644 --- a/Robust/src/Runtime/DSTM/interface/llookup.h +++ b/Robust/src/Runtime/DSTM/interface/llookup.h @@ -5,6 +5,8 @@ #include #include +#define SIMPLE_LLOOKUP + #define LOADFACTOR 0.75 #define HASH_SIZE 100 @@ -22,11 +24,17 @@ typedef struct lhashtable { pthread_mutex_t locktable; } lhashtable_t; -unsigned int lhashCreate(unsigned int size, float loadfactor);// returns 0 for success and 0 for failure -unsigned int lhashFunction(unsigned int oid); // returns 0 for success and 0 for failure -unsigned int lhashInsert(unsigned int oid, unsigned int mid); // returns 0 for success and 0 for failure -unsigned int lhashSearch(unsigned int oid); //returns mid, 0 if not found -unsigned int lhashRemove(unsigned int oid); //returns 0 if not success -unsigned int lhashResize(unsigned int newsize); // returns 0 for success and 0 for failure +//returns 0 for success and 1 for failure +unsigned int lhashCreate(unsigned int size, float loadfactor); +//returns 0 for success and 1 for failure +unsigned int lhashInsert(unsigned int oid, unsigned int mid); +//returns mid, 0 if not found +unsigned int lhashSearch(unsigned int oid); +//returns 0 for success and 1 for failure +unsigned int lhashRemove(unsigned int oid); + +//helper functions +unsigned int lhashResize(unsigned int newsize); +unsigned int lhashFunction(unsigned int oid); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index c5f3500a..487b3cb3 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -23,6 +23,7 @@ #define RECEIVE_BUFFER_SIZE 2048 #define NUM_THREADS 10 #define PREFETCH_CACHE_SIZE 1048576 //1MB +#define CONFIG_FILENAME "dstm.conf" /* Global Variables */ extern int classsize[]; @@ -34,6 +35,13 @@ pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch qu pthread_t tPrefetch; extern objstr_t *mainobjstore; unsigned int myIpAddr; +unsigned int *hostIpAddrs; +int sizeOfHostArray; +int numHostsInSystem; +int myIndexInHostArray; +unsigned int oidsPerBlock; +unsigned int oidMin; +unsigned int oidMax; plistnode_t *createPiles(transrecord_t *); inline int arrayLength(int *array) { @@ -87,7 +95,8 @@ int dstmStartup(const char * option) { pthread_attr_t attr; int master=strcmp(option, "master")==0; - myIpAddr = getMyIpAddr("eth0"); + if (processConfigFile() != 0) + return 0; //TODO: return error value, cause main program to exit dstmInit(); transInit(); @@ -1558,3 +1567,113 @@ int startRemoteThread(unsigned int oid, unsigned int mid) return status; } +//TODO: when reusing oids, make sure they are not already in use! +unsigned int getNewOID(void) { + static unsigned int id = 0xFFFFFFFF; + + id += 2; + if (id > oidMax || id < oidMin) + { + id = (oidMin | 1); + } + return id; +} + +int processConfigFile() +{ + FILE *configFile; + const int maxLineLength = 200; + char lineBuffer[maxLineLength]; + char *token; + const char *delimiters = " \t\n"; + char *commentBegin; + in_addr_t tmpAddr; + + configFile = fopen(CONFIG_FILENAME, "r"); + if (configFile == NULL) + { + printf("error opening %s:\n", CONFIG_FILENAME); + perror(""); + return -1; + } + + numHostsInSystem = 0; + sizeOfHostArray = 8; + hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int)); + + while(fgets(lineBuffer, maxLineLength, configFile) != NULL) + { + commentBegin = strchr(lineBuffer, '#'); + if (commentBegin != NULL) + *commentBegin = '\0'; + token = strtok(lineBuffer, delimiters); + while (token != NULL) + { + tmpAddr = inet_addr(token); + if ((int)tmpAddr == -1) + { + printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token); + fclose(configFile); + return -1; + } + else + addHost(htonl(tmpAddr)); + token = strtok(NULL, delimiters); + } + } + + fclose(configFile); + + if (numHostsInSystem < 1) + { + printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME); + return -1; + } + myIpAddr = getMyIpAddr("eth0"); + myIndexInHostArray = findHost(myIpAddr); + if (myIndexInHostArray == -1) + { + printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME); + return -1; + } + oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1; + oidMin = oidsPerBlock * myIndexInHostArray; + if (myIndexInHostArray == numHostsInSystem - 1) + oidMax = 0xFFFFFFFF; + else + oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1; + + return 0; +} + +void addHost(unsigned int hostIp) +{ + unsigned int *tmpArray; + + if (findHost(hostIp) != -1) + return; + + if (numHostsInSystem == sizeOfHostArray) + { + tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int)); + memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem); + free(hostIpAddrs); + hostIpAddrs = tmpArray; + } + + hostIpAddrs[numHostsInSystem++] = hostIp; + + return; +} + +int findHost(unsigned int hostIp) +{ + int i; + for (i = 0; i < numHostsInSystem; i++) + if (hostIpAddrs[i] == hostIp) + return i; + + //not found + return -1; +} +