Added code to read config file of IP address, and use these for the mostly fair distr...
authorerubow <erubow>
Wed, 12 Sep 2007 02:03:07 +0000 (02:03 +0000)
committererubow <erubow>
Wed, 12 Sep 2007 02:03:07 +0000 (02:03 +0000)
Robust/src/Runtime/DSTM/interface/dstm.c
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/llookup.c
Robust/src/Runtime/DSTM/interface/llookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index c586dd754c4e08dc33ee4b1f5d53b459b46b79b6..e404925e2368f6046730fe03ad786d4740afdca1 100644 (file)
@@ -4,12 +4,6 @@ extern int classsize[];
 
 /* BEGIN object header */
 
-// Get a new object id
-unsigned int getNewOID(void) {
-       static int id = 1;
-       return id+=2;
-}
-
 // Get the size of the object for a given type
 unsigned int objSize(objheader_t *object) {
        return classsize[TYPE(object)];
index ffa3036186ba0c4ddefa9bb08d477b523d5daf0b..59de24855e1229e482dd433caefffe418f16af08 100644 (file)
@@ -206,6 +206,8 @@ int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 
 int dstmStartup(const char *);
 void transInit();
+int processConfigFile();
+void addHost(unsigned int);
 
 void randomdelay(void);
 transrecord_t *transStart();
index 8aa6e35f3b64e5f2b3b6a2a4f2e38452937ba840..dff96e3292db479bd691513ffbee085ac0eb837f 100644 (file)
@@ -105,7 +105,6 @@ void *dstmAccept(void *acceptfd)
        
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
-       printf("Recieved connection: fd = %d\n", (int)acceptfd);
        /* Receive control messages from other machines */
        if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
                if (retval == 0) {
@@ -182,9 +181,9 @@ void *dstmAccept(void *acceptfd)
                                        retval);
                        else
                        { //TODO: execute run method on this global thread object
-                               printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid);
+                               printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid);
                                objType = getObjType(oid);
-                               printf("dstmAccept(): type of object %d is %d\n", oid, objType);
+                               printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType);
                        }
                        break;
 
@@ -195,8 +194,6 @@ void *dstmAccept(void *acceptfd)
        /* Close connection */
        if (close((int)acceptfd) == -1)
                perror("close");
-       else 
-               printf("Closed connection: fd = %d\n", (int)acceptfd);
        
        pthread_exit(NULL);
 }
index cc989a289707f29d0c3d7cd0206b0388acff467e..3076097815c200da32930c69a2b2e5425f75939c 100644 (file)
@@ -9,6 +9,36 @@
 ***************************************************************************************************/
 #include "llookup.h"
 
+#ifdef SIMPLE_LLOOKUP
+
+extern unsigned int *hostIpAddrs;
+extern unsigned int oidsPerBlock;
+
+unsigned int lhashCreate(unsigned int size, float loadfactor)
+{
+       return 0;
+}
+
+unsigned int lhashInsert(unsigned int oid, unsigned int mid)
+{
+       return 0;
+}
+
+unsigned int lhashSearch(unsigned int oid)
+{
+       if (oidsPerBlock == 0)
+               return hostIpAddrs[0];
+       else
+               return hostIpAddrs[oid / oidsPerBlock];
+}
+
+unsigned int lhashRemove(unsigned int oid)
+{
+       return 0;
+}
+
+#else
+
 lhashtable_t llookup;          //Global Hash table
 
 // Creates a hash table with size and an array of lhashlistnode_t 
@@ -193,3 +223,6 @@ unsigned int lhashResize(unsigned int newsize) {
        free(ptr);              //Free the memory of the old hash table 
        return 0;
 }
+
+#endif
+
index 7a70b9bda353f3aa515ec24df5ca21ef9d8bc902..20dbac7251694ea8d0d1b5c9d77914c70f03a877 100644 (file)
@@ -5,6 +5,8 @@
 #include <stdio.h>
 #include <pthread.h>
 
+#define SIMPLE_LLOOKUP
+
 #define LOADFACTOR 0.75
 #define HASH_SIZE 100
 
@@ -22,11 +24,17 @@ typedef struct lhashtable {
        pthread_mutex_t locktable;
 } lhashtable_t;
 
-unsigned int lhashCreate(unsigned int size, float loadfactor);// returns 0 for success and 0 for failure
-unsigned int lhashFunction(unsigned int oid); // returns 0 for success and 0 for failure
-unsigned int lhashInsert(unsigned int oid, unsigned int mid); // returns 0 for success and 0 for failure
-unsigned int lhashSearch(unsigned int oid); //returns mid, 0 if not found
-unsigned int lhashRemove(unsigned int oid); //returns 0 if not success
-unsigned int lhashResize(unsigned int newsize);  // returns 0 for success and 0 for failure
+//returns 0 for success and 1 for failure
+unsigned int lhashCreate(unsigned int size, float loadfactor);
+//returns 0 for success and 1 for failure
+unsigned int lhashInsert(unsigned int oid, unsigned int mid);
+//returns mid, 0 if not found
+unsigned int lhashSearch(unsigned int oid);
+//returns 0 for success and 1 for failure
+unsigned int lhashRemove(unsigned int oid);
+
+//helper functions
+unsigned int lhashResize(unsigned int newsize);
+unsigned int lhashFunction(unsigned int oid);
 
 #endif
index c5f3500ade25fcfc69c4aeafc1d6682e364bf525..487b3cb3c40357e0a0bf664ca52eae0d7af2cf81 100644 (file)
@@ -23,6 +23,7 @@
 #define RECEIVE_BUFFER_SIZE 2048
 #define NUM_THREADS 10
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
+#define CONFIG_FILENAME "dstm.conf"
 
 /* Global Variables */
 extern int classsize[];
@@ -34,6 +35,13 @@ pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch qu
 pthread_t tPrefetch;
 extern objstr_t *mainobjstore;
 unsigned int myIpAddr;
+unsigned int *hostIpAddrs;
+int sizeOfHostArray;
+int numHostsInSystem;
+int myIndexInHostArray;
+unsigned int oidsPerBlock;
+unsigned int oidMin;
+unsigned int oidMax;
 
 plistnode_t *createPiles(transrecord_t *);
 inline int arrayLength(int *array) {
@@ -87,7 +95,8 @@ int dstmStartup(const char * option) {
   pthread_attr_t attr;
   int master=strcmp(option, "master")==0;
 
-  myIpAddr = getMyIpAddr("eth0");
+       if (processConfigFile() != 0)
+               return 0; //TODO: return error value, cause main program to exit
 
   dstmInit();
   transInit();
@@ -1558,3 +1567,113 @@ int startRemoteThread(unsigned int oid, unsigned int mid)
        return status;
 }
 
+//TODO: when reusing oids, make sure they are not already in use!
+unsigned int getNewOID(void) {
+       static unsigned int id = 0xFFFFFFFF;
+       
+       id += 2;
+       if (id > oidMax || id < oidMin)
+       {
+               id = (oidMin | 1);
+       }
+       return id;
+}
+
+int processConfigFile()
+{
+       FILE *configFile;
+       const int maxLineLength = 200;
+       char lineBuffer[maxLineLength];
+       char *token;
+       const char *delimiters = " \t\n";
+       char *commentBegin;
+       in_addr_t tmpAddr;
+       
+       configFile = fopen(CONFIG_FILENAME, "r");
+       if (configFile == NULL)
+       {
+               printf("error opening %s:\n", CONFIG_FILENAME);
+               perror("");
+               return -1;
+       }
+
+       numHostsInSystem = 0;
+       sizeOfHostArray = 8;
+       hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
+       
+       while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
+       {
+               commentBegin = strchr(lineBuffer, '#');
+               if (commentBegin != NULL)
+                       *commentBegin = '\0';
+               token = strtok(lineBuffer, delimiters);
+               while (token != NULL)
+               {
+                       tmpAddr = inet_addr(token);
+                       if ((int)tmpAddr == -1)
+                       {
+                               printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
+                               fclose(configFile);
+                               return -1;
+                       }
+                       else
+                               addHost(htonl(tmpAddr));
+                       token = strtok(NULL, delimiters);
+               }
+       }
+
+       fclose(configFile);
+       
+       if (numHostsInSystem < 1)
+       {
+               printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
+               return -1;
+       }
+       myIpAddr = getMyIpAddr("eth0");
+       myIndexInHostArray = findHost(myIpAddr);
+       if (myIndexInHostArray == -1)
+       {
+               printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
+               return -1;
+       }
+       oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
+       oidMin = oidsPerBlock * myIndexInHostArray;
+       if (myIndexInHostArray == numHostsInSystem - 1)
+               oidMax = 0xFFFFFFFF;
+       else
+               oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
+
+       return 0;
+}
+
+void addHost(unsigned int hostIp)
+{
+       unsigned int *tmpArray;
+
+       if (findHost(hostIp) != -1)
+               return;
+
+       if (numHostsInSystem == sizeOfHostArray)
+       {
+               tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
+               memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
+               free(hostIpAddrs);
+               hostIpAddrs = tmpArray;
+       }
+
+       hostIpAddrs[numHostsInSystem++] = hostIp;
+
+       return;
+}
+
+int findHost(unsigned int hostIp)
+{
+       int i;
+       for (i = 0; i < numHostsInSystem; i++)
+               if (hostIpAddrs[i] == hostIp)
+                       return i;
+
+       //not found
+       return -1;
+}
+