Trans commit initial version
authoradash <adash>
Thu, 22 Mar 2007 17:56:48 +0000 (17:56 +0000)
committeradash <adash>
Thu, 22 Mar 2007 17:56:48 +0000 (17:56 +0000)
TODO client handles TRANS_AGREE and TRANS_DISAGREE with TRANS_COMMIT

Robust/src/Runtime/DSTM/interface/Makefile
Robust/src/Runtime/DSTM/interface/clookup.h
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/plookup.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/plookup.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/trans.c

index a2a65d4d0c027e7155bd8f07c4815823e6e138f3..19709dccdbc4d1147411b761089d00e191c0a962 100644 (file)
@@ -1,8 +1,8 @@
 client:
-       gcc -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c 
+       gcc -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c
 
 server:
-       gcc -g -o server dstmserver.c testserver.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c 
+       gcc -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c 
 
 clean:
        rm client server
index 8b8df2cf54b34984f803c56b841e083d5a5b220c..b71abfd5f443c1275be6290beed8dbd0a751fc46 100644 (file)
@@ -13,7 +13,7 @@ typedef struct chashlistnode {
        struct chashlistnode *next;
 } chashlistnode_t;
 
-typedef struct cashehashtable {
+typedef struct chashtable {
        chashlistnode_t *table; // points to beginning of hash table
        unsigned int size;
        unsigned int numelements;
index eb82144a00fb4db64a3b09277bab31f1d5e0cdf5..7ac0706a59af55bf5bbadf59fa92f891524a231f 100644 (file)
@@ -16,8 +16,9 @@
 #define OBJECTS_FOUND          10
 #define OBJECTS_NOT_FOUND      11
 #define TRANS_AGREE            12
-#define TRANS_DISAGREE         13
-#define TRANS_SUCESSFUL                14
+#define TRANS_DISAGREE         13//for soft abort
+#define TRANS_DISAGREE_ABORT   14//for hard abort
+#define TRANS_SUCESSFUL                15//Not necessary for now
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -28,6 +29,7 @@
 //bit designations for status field of objheader
 #define DIRTY 0x01
 #define NEW   0x02
+#define LOCK  0x04
 
 typedef struct objheader {
        unsigned int oid;
@@ -48,6 +50,12 @@ typedef struct transrecord {
        chashtable_t *lookupTable;
 } transrecord_t;
 
+typedef struct pile {
+       unsigned int mid;
+       unsigned int oid;
+       struct pile *next;
+}pile_t;
+
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
index fca266442d51c00dddbd6ac148cba62c246cb1c2..ae334255570cf5ec56aa9a7417d1b981d9e6f838 100644 (file)
@@ -124,6 +124,12 @@ void *dstmAccept(void *acceptfd)
                        case MOVE_MULT_REQUEST:
                                break;
                        case TRANS_REQUEST:
+                               printf("Client sent %d\n",buffer[0]);
+                               int offset = 1;
+                               printf("Num Read  %d\n",*((short*)(buffer+offset)));
+                               offset += sizeof(short);
+                               printf("Num modified  %d\n",*((short*)(buffer+offset)));
+                               handleTransReq(acceptfd, buffer);
                                break;
                        case TRANS_ABORT:
                                break;
@@ -144,3 +150,170 @@ void *dstmAccept(void *acceptfd)
        pthread_exit(NULL);
 }
 
+//TOOD put __FILE__ __LINE__ for all error conditions
+int handleTransReq(int acceptfd, char *buf) {
+       short numread = 0, nummod = 0;
+       char control;
+       int offset = 0, size,i;
+       int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
+       objheader_t *headptr = NULL;
+       objstr_t *tmpholder;
+       void *top, *mobj;
+       char sendbuf[RECEIVE_BUFFER_SIZE];
+
+       control = buf[0];
+       offset = 1;
+       numread = *((short *)(buf+offset));
+       offset += sizeof(short);
+       nummod = *((short *)(buf+offset));
+       offset += sizeof(short);
+       if (numread) {
+               //Make an array to store the object headers for all objects that are only read
+               if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
+                       perror("handleTransReq: Calloc error");
+                       return 1;
+               }
+               //Process each object id that is only read
+               for (i = 0; i < numread; i++) {
+                       objheader_t *tmp;
+                       tmp = (objheader_t *) (buf + offset);
+                       //find if object is still present in the same machine since TRANS_REQUEST
+                       if ((mobj = mhashSearch(tmp->oid)) == NULL) {
+                               objnotfound++;
+                               /*
+                               sendbuf[0] = OBJECT_NOT_FOUND;
+                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                       perror("");
+                               }
+                               */
+                               } else { // If obj found in machine (i.e. has not moved)
+                               //Check if obj is locked
+                               if ((((objheader_t *)mobj)->status >> 3) == 1) {
+                                       //Check version of the object
+                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
+                                               transdis++;
+                                               /*
+                                               sendbuf[0] = TRANS_DISAGREE;
+                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                       perror("");
+                                               }
+                                       */
+                                       } else {//If versions don't match ..HARD ABORT
+                                               transabort++;
+                                               /*
+                                               sendbuf[0] = TRANS_DISAGREE_ABORT;
+                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                       perror("");
+                                               }
+                                               */
+                                       }
+                               } else {// If object not locked then lock it
+                                       ((objheader_t *)mobj)->status |= LOCK;
+                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
+                                               transagree++;
+                                               /*
+                                               sendbuf[0] = TRANS_AGREE;
+                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                       perror("");
+                                               }
+                                               */
+                                       } else {//If versions don't match
+                                               transabort++;
+                                               /*
+                                               sendbuf[0] = TRANS_DISAGREE_ABORT;
+                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                       perror("");
+                                               }
+                                               */
+                                       }
+                               }
+                       }       
+                       memcpy(headptr, buf+offset, sizeof(objheader_t));
+                       offset += sizeof(objheader_t);
+               }
+       }
+       if (nummod) {
+               if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
+                       perror("handleTransReq: Calloc error");
+                       return 1;
+               }
+               
+               //Process each object id that is only modified
+               for(i = 0; i < nummod; i++) {
+                       objheader_t *tmp;
+                       tmp = (objheader_t *)(buf + offset);
+                       //find if object is still present in the same machine since TRANS_REQUEST
+                       if ((mobj = mhashSearch(tmp->oid)) == NULL) {
+                               objnotfound++;
+                               /*
+                                  sendbuf[0] = OBJECT_NOT_FOUND;
+                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                  perror("");
+                                  }
+                                */
+                       } else { // If obj found in machine (i.e. has not moved)
+                               //Check if obj is locked
+                               if ((((objheader_t *)mobj)->status >> 3) == 1) {
+                                       //Check version of the object
+                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
+                                               transdis++;
+                                               /*
+                                                  sendbuf[0] = TRANS_DISAGREE;
+                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                  perror("");
+                                                  }
+                                                */
+                                       } else {//If versions don't match ..HARD ABORT
+                                               transabort++;
+                                               /*
+                                                  sendbuf[0] = TRANS_DISAGREE_ABORT;
+                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                  perror("");
+                                                  }
+                                                */
+                                       }
+                               } else {// If object not locked then lock it
+                                       ((objheader_t *)mobj)->status |= LOCK;
+                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
+                                               transagree++;
+                                               /*
+                                                  sendbuf[0] = TRANS_AGREE;
+                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                  perror("");
+                                                  }
+                                                */
+                                       } else {//If versions don't match
+                                               transabort++;
+                                               /*
+                                                  sendbuf[0] = TRANS_DISAGREE_ABORT;
+                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                                                  perror("");
+                                                  }
+                                                */
+                                       }
+                               }
+                       }       
+
+                       size = sizeof(objheader_t) + classsize[tmp->type];
+                       if ((top = objstrAlloc(tmpholder, size)) == NULL) {
+                                       perror("handleTransReq: Calloc error");
+                                       return 1;
+                       }
+                       memcpy(top, buf+offset, size);
+                       offset += size;
+               }
+       }
+       if(transabort > 0) {
+               sendbuf[0] = TRANS_DISAGREE_ABORT;
+               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                       perror("");
+               }
+       
+       } else {
+               sendbuf[0] = TRANS_AGREE;
+               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+                       perror("");
+               }
+       }
+       return 0;
+}
diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c
new file mode 100644 (file)
index 0000000..05ea342
--- /dev/null
@@ -0,0 +1,75 @@
+ #include "plookup.h"
+
+plistnode_t *pCreate(int objects) {
+       plistnode_t *pile;
+       
+       //Create main structure
+       if((pile = calloc(1, sizeof(plistnode_t))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }       
+       pile->next = NULL;
+       //Create array of objects
+       if((pile->obj = calloc(objects, sizeof(unsigned int))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       pile->index = 0;
+       pile->vote = 0;
+       return pile;
+}
+
+unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) {
+       plistnode_t *ptr, *tmp;
+       int found = 0;
+
+       tmp = pile;
+       //Add oid into a machine that is a part of the pile linked list structure
+       while(tmp != NULL) {
+               if (tmp->mid == mid) {
+                       tmp->obj[tmp->index] = oid;
+                       tmp->index++;
+                       found = 1;
+                       break;
+               }
+               tmp = tmp->next;
+       }
+       //Add oid for any new machine 
+       if (!found) {
+               if((ptr = pCreate(num_objs)) == NULL) {
+                       return 1;
+               }
+               ptr->mid = mid;
+               ptr->obj[ptr->index] = oid;
+               ptr->index++;
+               ptr->next = pile;
+               pile = ptr;
+       }
+       return 0;
+}
+
+// Return objects for a given mid
+unsigned int *pSearch(plistnode_t *pile, unsigned int mid) {
+       plistnode_t *tmp;
+       tmp = pile;
+       while(tmp != NULL) {
+               if(tmp->mid == mid) {
+                       return(tmp->obj);
+               }
+               tmp = tmp->next;
+       }
+       return NULL;
+}
+
+//Delete the entire pile
+void pDelete(plistnode_t *pile) {
+       plistnode_t *next, *tmp;
+       tmp = pile;
+       while(tmp != NULL) {
+               next = tmp->next;
+               free(tmp->obj);
+               free(tmp);
+               tmp = next;
+       }
+       return;
+}
diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h
new file mode 100644 (file)
index 0000000..a1ee01b
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef _PLOOKUP_H_
+#define _PLOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+
+typedef struct plistnode {
+       unsigned int mid;
+       unsigned int *obj; //this can be cast to another type or used to point to a larger structure
+       int index;
+       int vote;
+       struct plistnode *next;
+} plistnode_t;
+
+plistnode_t  *pCreate(int);
+unsigned int pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int);
+unsigned int *pSearch(plistnode_t *, unsigned int mid);
+void pDelete(plistnode_t *);
+
+#endif
+
index 7934b6a15be14a57aa383b4a9387973d9c268538..d852dcce8df7caf1a22408436d99f13829fbf288 100644 (file)
@@ -2,6 +2,7 @@
 #include "clookup.h"
 #include "mlookup.h"
 #include "llookup.h"
+#include "plookup.h"
 #include<sys/types.h>
 #include<sys/socket.h>
 #include<netdb.h>
@@ -43,10 +44,12 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
                chashInsert(record->lookupTable, objheader->oid, objcopy); 
                return(objcopy);
        } else {
+               //Get the object from the remote location
                printf("oid not found in local machine lookup\n");
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
                if(objcopy == NULL)
+                       //If object is not found in Remote location
                        printf("Object not found in Machine %d\n", machinenumber);
                else
                        return(objcopy);
@@ -66,10 +69,203 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
 }
 
 int transCommit(transrecord_t *record){        
-       //Move objects to machine that hosts it 
+       chashlistnode_t *curr, *ptr, *next;
+       unsigned int size;//Represents number of bins in the chash table
+       unsigned int machinenum;
+       objheader_t *headeraddr, *localheaderaddr;
+       plistnode_t *tmp, *pile = NULL;
+       int sd,n,i;
+       short numread = 0,nummod = 0;
+       struct sockaddr_in serv_addr;
+       struct hostent *server;
+       char buffer[RECEIVE_BUFFER_SIZE],control;
+       
+       ptr = record->lookupTable->table;
+       size = record->lookupTable->size;
+       //Look through all the objects in the cache and make pils
+       //Outer loop for chashtable
+       for(i = 0; i < size ;i++) {
+               curr = &ptr[i];
+               //Inner loop to traverse the linked list of the cache lookupTable
+               while(curr != NULL) {
+                       //if the first bin in hash table is empty
+                       if(curr->key == 0) {
+                               break;
+                       }
+                       next = curr->next;
+                       //Get machine location for object id
+                       machinenum = lhashSearch(curr->key);
+                       // Make piles
+                       pInsert(pile, machinenum, curr->key, record->lookupTable->numelements); 
+                       curr = next;
+               }
+       }
+
+       tmp = pile;     
+       unsigned int oidmod[record->lookupTable->numelements];
+       unsigned int oidread[record->lookupTable->numelements];
+       //Process each machine in pile
+       while(tmp != NULL) {
+               //Identify which oids have been updated and which ones have been just read
+               for(i = 0; i < pile->index; i++) {
+                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, pile->obj[i]);
+                       //check if object modified in cache  ??
+                       if(headeraddr->status >>= DIRTY){
+                               //Keep track of oids that have been modified    
+                               oidmod[nummod] = headeraddr->oid;
+                               nummod++;
+                       } else {
+                               oidread[numread] = headeraddr->oid;
+                               numread++;
+                       }
+               }
+               //Send Trans Request in the form
+               if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+                       perror("Error in socket for TRANS_REQUEST");
+                       return 1;
+               }
+               bzero((char*) &serv_addr, sizeof(serv_addr));
+               serv_addr.sin_family = AF_INET;
+               serv_addr.sin_port = htons(LISTEN_PORT);
+               serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+               //serv_addr.sin_addr.s_addr = inet_addr(pile->mid);
+
+               if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+                       perror("Error in connect for TRANS_REQUEST");
+                       return 1;
+               }
+               
+               bzero((char *)buffer,sizeof(buffer));
+               control = TRANS_REQUEST;
+               buffer[0] = control;
+               //Send numread, nummod, sizeof header for objects read, size of header+objects that are modified
+               int offset = 1;
+               memcpy(buffer+offset, &numread, sizeof(short));
+               offset += sizeof(short);
+               memcpy(buffer+offset, &nummod, sizeof(short));
+               offset += sizeof(short);
+               for( i= 0; i< numread; i++) {
+                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidread[i]);
+                       memcpy(buffer+offset, headeraddr, sizeof(objheader_t));
+                       offset += sizeof(objheader_t);
+               }
+               for( i= 0; i< nummod; i++) {
+                       headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]);
+                       memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]);
+                       offset += sizeof(objheader_t) + classsize[headeraddr->type];
+               }
+               if (offset > RECEIVE_BUFFER_SIZE) {
+                       printf("Error: Buffersize too small");
+               }
+               if (write(sd, buffer, sizeof(buffer)) < 0) {
+                       perror("Error sending message");
+                       return 1;
+               }
+#ifdef DEBUG1
+               printf("DEBUG -> ready to rcv ...\n");
+#endif
+               read(sd, buffer, sizeof(buffer));
+               close(sd);
+               printf("Server sent %d\n",buffer[0]);
+               /*
+               if (buffer[0] == TRANS_AGREE) {
+                       //change machine pile
+                       
+               } 
+               */
+               //Reset numread and nummod for the next pile
+              numread = nummod = 0;    
+              tmp = tmp->next;
+
+       }
 
 }
 
+
+#if 0
+int transCommit(transrecord_t *record){        
+       //Look through all the objects in the cache
+       int i,numelements,isFirst;
+       unsigned int size,machinenum;//Represents number of buckets
+       void *address;
+       objheader_t *headeraddr,localheaderaddr;
+       chashlistnode_t *curr, *ptr, *next;
+       int sd, size;
+       struct sockaddr_in serv_addr;
+       struct hostent *server;
+       char buffer[RECEIVE_BUFFER_SIZE],control;
+       
+       ptr = record->lookupTable->table;
+       size = record->lookupTable->size;
+       //Outer loop for chashtable
+       for(i = 0; i< size ;i++) {
+               curr = &ptr[i];
+               //Inner look to traverse the linked list of the cache lookupTable
+               while(curr != NULL) {
+                       if(curr->key == 0) {
+                               break;
+                       }
+                       //Find if local or remote
+                       address = mhashSearch(curr->key);
+                       d
+                       localheaderaddr = (objheader_t *) curr->value;
+                       if(address != NULL) {
+                               //Is local so  check if the local copy has been updated 
+                               headeraddr = (objheader_t *) address;                           
+                               if(localheaderaddr->version == headeraddr->version){
+                                       //Lock Object
+                                       
+                               }
+                               else {
+                                       //vote as DISAGREE
+                                       //Start TransAbort();
+                                       //Unlock object
+                               }
+                       }
+                       else {
+                               //Is remote
+                               //Find which machine it belongs to
+                               machinenum = lhashSearch(curr->key);
+                               //Start TRANS_REQUEST to machine
+
+                               if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+                                       perror("Error in socket");
+                                       return NULL;
+                               }
+                               bzero((char*) &serv_addr, sizeof(serv_addr));
+                               serv_addr.sin_family = AF_INET;
+                               serv_addr.sin_port = htons(LISTEN_PORT);
+                               serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+
+                               if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+                                       perror("Error in connect");
+                                       return NULL;
+                               }
+                               bzero((char *)buffer,sizeof(buffer));
+                               control = READ_REQUEST;
+                               buffer[0] = control;
+                               memcpy(buffer+1, &oid, sizeof(int));
+                               if (write(sd, buffer, sizeof(buffer)) < 0) {
+                                       perror("Error sending message");
+                                       return NULL;
+                               }
+
+#ifdef DEBUG1
+                               printf("DEBUG -> ready to rcv ...\n");
+#endif
+                               read(sd, buffer, sizeof(buffer));
+                               close(sd);
+
+                       }
+                       next = curr->next;
+               }
+               curr = next;
+       }       
+
+}
+#endif
+
+
 int transAbort(transrecord_t *record){
 
 }