From ea370f8004f2cd7db130a5af2c3e68bbe1f9d1b7 Mon Sep 17 00:00:00 2001 From: adash Date: Thu, 22 Mar 2007 17:56:48 +0000 Subject: [PATCH] Trans commit initial version TODO client handles TRANS_AGREE and TRANS_DISAGREE with TRANS_COMMIT --- Robust/src/Runtime/DSTM/interface/Makefile | 4 +- Robust/src/Runtime/DSTM/interface/clookup.h | 2 +- Robust/src/Runtime/DSTM/interface/dstm.h | 12 +- .../src/Runtime/DSTM/interface/dstmserver.c | 173 +++++++++++++++ Robust/src/Runtime/DSTM/interface/plookup.c | 75 +++++++ Robust/src/Runtime/DSTM/interface/plookup.h | 21 ++ Robust/src/Runtime/DSTM/interface/trans.c | 198 +++++++++++++++++- 7 files changed, 479 insertions(+), 6 deletions(-) create mode 100644 Robust/src/Runtime/DSTM/interface/plookup.c create mode 100644 Robust/src/Runtime/DSTM/interface/plookup.h diff --git a/Robust/src/Runtime/DSTM/interface/Makefile b/Robust/src/Runtime/DSTM/interface/Makefile index a2a65d4d..19709dcc 100644 --- a/Robust/src/Runtime/DSTM/interface/Makefile +++ b/Robust/src/Runtime/DSTM/interface/Makefile @@ -1,8 +1,8 @@ client: - gcc -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c + gcc -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c server: - gcc -g -o server dstmserver.c testserver.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c + gcc -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c clean: rm client server diff --git a/Robust/src/Runtime/DSTM/interface/clookup.h b/Robust/src/Runtime/DSTM/interface/clookup.h index 8b8df2cf..b71abfd5 100644 --- a/Robust/src/Runtime/DSTM/interface/clookup.h +++ b/Robust/src/Runtime/DSTM/interface/clookup.h @@ -13,7 +13,7 @@ typedef struct chashlistnode { struct chashlistnode *next; } chashlistnode_t; -typedef struct cashehashtable { +typedef struct chashtable { chashlistnode_t *table; // points to beginning of hash table unsigned int size; unsigned int numelements; diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index eb82144a..7ac0706a 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -16,8 +16,9 @@ #define OBJECTS_FOUND 10 #define OBJECTS_NOT_FOUND 11 #define TRANS_AGREE 12 -#define TRANS_DISAGREE 13 -#define TRANS_SUCESSFUL 14 +#define TRANS_DISAGREE 13//for soft abort +#define TRANS_DISAGREE_ABORT 14//for hard abort +#define TRANS_SUCESSFUL 15//Not necessary for now #include #include @@ -28,6 +29,7 @@ //bit designations for status field of objheader #define DIRTY 0x01 #define NEW 0x02 +#define LOCK 0x04 typedef struct objheader { unsigned int oid; @@ -48,6 +50,12 @@ typedef struct transrecord { chashtable_t *lookupTable; } transrecord_t; +typedef struct pile { + unsigned int mid; + unsigned int oid; + struct pile *next; +}pile_t; + /* Initialize main object store and lookup tables, start server thread. */ int dstmInit(void); diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index fca26644..ae334255 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -124,6 +124,12 @@ void *dstmAccept(void *acceptfd) case MOVE_MULT_REQUEST: break; case TRANS_REQUEST: + printf("Client sent %d\n",buffer[0]); + int offset = 1; + printf("Num Read %d\n",*((short*)(buffer+offset))); + offset += sizeof(short); + printf("Num modified %d\n",*((short*)(buffer+offset))); + handleTransReq(acceptfd, buffer); break; case TRANS_ABORT: break; @@ -144,3 +150,170 @@ void *dstmAccept(void *acceptfd) pthread_exit(NULL); } +//TOOD put __FILE__ __LINE__ for all error conditions +int handleTransReq(int acceptfd, char *buf) { + short numread = 0, nummod = 0; + char control; + int offset = 0, size,i; + int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0; + objheader_t *headptr = NULL; + objstr_t *tmpholder; + void *top, *mobj; + char sendbuf[RECEIVE_BUFFER_SIZE]; + + control = buf[0]; + offset = 1; + numread = *((short *)(buf+offset)); + offset += sizeof(short); + nummod = *((short *)(buf+offset)); + offset += sizeof(short); + if (numread) { + //Make an array to store the object headers for all objects that are only read + if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) { + perror("handleTransReq: Calloc error"); + return 1; + } + //Process each object id that is only read + for (i = 0; i < numread; i++) { + objheader_t *tmp; + tmp = (objheader_t *) (buf + offset); + //find if object is still present in the same machine since TRANS_REQUEST + if ((mobj = mhashSearch(tmp->oid)) == NULL) { + objnotfound++; + /* + sendbuf[0] = OBJECT_NOT_FOUND; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } else { // If obj found in machine (i.e. has not moved) + //Check if obj is locked + if ((((objheader_t *)mobj)->status >> 3) == 1) { + //Check version of the object + if (tmp->version == ((objheader_t *)mobj)->version) {//If version match + transdis++; + /* + sendbuf[0] = TRANS_DISAGREE; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } else {//If versions don't match ..HARD ABORT + transabort++; + /* + sendbuf[0] = TRANS_DISAGREE_ABORT; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } + } else {// If object not locked then lock it + ((objheader_t *)mobj)->status |= LOCK; + if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match + transagree++; + /* + sendbuf[0] = TRANS_AGREE; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } else {//If versions don't match + transabort++; + /* + sendbuf[0] = TRANS_DISAGREE_ABORT; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } + } + } + memcpy(headptr, buf+offset, sizeof(objheader_t)); + offset += sizeof(objheader_t); + } + } + if (nummod) { + if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) { + perror("handleTransReq: Calloc error"); + return 1; + } + + //Process each object id that is only modified + for(i = 0; i < nummod; i++) { + objheader_t *tmp; + tmp = (objheader_t *)(buf + offset); + //find if object is still present in the same machine since TRANS_REQUEST + if ((mobj = mhashSearch(tmp->oid)) == NULL) { + objnotfound++; + /* + sendbuf[0] = OBJECT_NOT_FOUND; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } else { // If obj found in machine (i.e. has not moved) + //Check if obj is locked + if ((((objheader_t *)mobj)->status >> 3) == 1) { + //Check version of the object + if (tmp->version == ((objheader_t *)mobj)->version) {//If version match + transdis++; + /* + sendbuf[0] = TRANS_DISAGREE; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } else {//If versions don't match ..HARD ABORT + transabort++; + /* + sendbuf[0] = TRANS_DISAGREE_ABORT; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } + } else {// If object not locked then lock it + ((objheader_t *)mobj)->status |= LOCK; + if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match + transagree++; + /* + sendbuf[0] = TRANS_AGREE; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } else {//If versions don't match + transabort++; + /* + sendbuf[0] = TRANS_DISAGREE_ABORT; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + */ + } + } + } + + size = sizeof(objheader_t) + classsize[tmp->type]; + if ((top = objstrAlloc(tmpholder, size)) == NULL) { + perror("handleTransReq: Calloc error"); + return 1; + } + memcpy(top, buf+offset, size); + offset += size; + } + } + if(transabort > 0) { + sendbuf[0] = TRANS_DISAGREE_ABORT; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + + } else { + sendbuf[0] = TRANS_AGREE; + if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) { + perror(""); + } + } + return 0; +} diff --git a/Robust/src/Runtime/DSTM/interface/plookup.c b/Robust/src/Runtime/DSTM/interface/plookup.c new file mode 100644 index 00000000..05ea3427 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/plookup.c @@ -0,0 +1,75 @@ + #include "plookup.h" + +plistnode_t *pCreate(int objects) { + plistnode_t *pile; + + //Create main structure + if((pile = calloc(1, sizeof(plistnode_t))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + pile->next = NULL; + //Create array of objects + if((pile->obj = calloc(objects, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s %d\n", __FILE__, __LINE__); + return NULL; + } + pile->index = 0; + pile->vote = 0; + return pile; +} + +unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) { + plistnode_t *ptr, *tmp; + int found = 0; + + tmp = pile; + //Add oid into a machine that is a part of the pile linked list structure + while(tmp != NULL) { + if (tmp->mid == mid) { + tmp->obj[tmp->index] = oid; + tmp->index++; + found = 1; + break; + } + tmp = tmp->next; + } + //Add oid for any new machine + if (!found) { + if((ptr = pCreate(num_objs)) == NULL) { + return 1; + } + ptr->mid = mid; + ptr->obj[ptr->index] = oid; + ptr->index++; + ptr->next = pile; + pile = ptr; + } + return 0; +} + +// Return objects for a given mid +unsigned int *pSearch(plistnode_t *pile, unsigned int mid) { + plistnode_t *tmp; + tmp = pile; + while(tmp != NULL) { + if(tmp->mid == mid) { + return(tmp->obj); + } + tmp = tmp->next; + } + return NULL; +} + +//Delete the entire pile +void pDelete(plistnode_t *pile) { + plistnode_t *next, *tmp; + tmp = pile; + while(tmp != NULL) { + next = tmp->next; + free(tmp->obj); + free(tmp); + tmp = next; + } + return; +} diff --git a/Robust/src/Runtime/DSTM/interface/plookup.h b/Robust/src/Runtime/DSTM/interface/plookup.h new file mode 100644 index 00000000..a1ee01b1 --- /dev/null +++ b/Robust/src/Runtime/DSTM/interface/plookup.h @@ -0,0 +1,21 @@ +#ifndef _PLOOKUP_H_ +#define _PLOOKUP_H_ + +#include +#include + +typedef struct plistnode { + unsigned int mid; + unsigned int *obj; //this can be cast to another type or used to point to a larger structure + int index; + int vote; + struct plistnode *next; +} plistnode_t; + +plistnode_t *pCreate(int); +unsigned int pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int); +unsigned int *pSearch(plistnode_t *, unsigned int mid); +void pDelete(plistnode_t *); + +#endif + diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 7934b6a1..d852dcce 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -2,6 +2,7 @@ #include "clookup.h" #include "mlookup.h" #include "llookup.h" +#include "plookup.h" #include #include #include @@ -43,10 +44,12 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) chashInsert(record->lookupTable, objheader->oid, objcopy); return(objcopy); } else { + //Get the object from the remote location printf("oid not found in local machine lookup\n"); machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) + //If object is not found in Remote location printf("Object not found in Machine %d\n", machinenumber); else return(objcopy); @@ -66,10 +69,203 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type) } int transCommit(transrecord_t *record){ - //Move objects to machine that hosts it + chashlistnode_t *curr, *ptr, *next; + unsigned int size;//Represents number of bins in the chash table + unsigned int machinenum; + objheader_t *headeraddr, *localheaderaddr; + plistnode_t *tmp, *pile = NULL; + int sd,n,i; + short numread = 0,nummod = 0; + struct sockaddr_in serv_addr; + struct hostent *server; + char buffer[RECEIVE_BUFFER_SIZE],control; + + ptr = record->lookupTable->table; + size = record->lookupTable->size; + //Look through all the objects in the cache and make pils + //Outer loop for chashtable + for(i = 0; i < size ;i++) { + curr = &ptr[i]; + //Inner loop to traverse the linked list of the cache lookupTable + while(curr != NULL) { + //if the first bin in hash table is empty + if(curr->key == 0) { + break; + } + next = curr->next; + //Get machine location for object id + machinenum = lhashSearch(curr->key); + // Make piles + pInsert(pile, machinenum, curr->key, record->lookupTable->numelements); + curr = next; + } + } + + tmp = pile; + unsigned int oidmod[record->lookupTable->numelements]; + unsigned int oidread[record->lookupTable->numelements]; + //Process each machine in pile + while(tmp != NULL) { + //Identify which oids have been updated and which ones have been just read + for(i = 0; i < pile->index; i++) { + headeraddr = (objheader_t *) chashSearch(record->lookupTable, pile->obj[i]); + //check if object modified in cache ?? + if(headeraddr->status >>= DIRTY){ + //Keep track of oids that have been modified + oidmod[nummod] = headeraddr->oid; + nummod++; + } else { + oidread[numread] = headeraddr->oid; + numread++; + } + } + //Send Trans Request in the form + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket for TRANS_REQUEST"); + return 1; + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); + //serv_addr.sin_addr.s_addr = inet_addr(pile->mid); + + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("Error in connect for TRANS_REQUEST"); + return 1; + } + + bzero((char *)buffer,sizeof(buffer)); + control = TRANS_REQUEST; + buffer[0] = control; + //Send numread, nummod, sizeof header for objects read, size of header+objects that are modified + int offset = 1; + memcpy(buffer+offset, &numread, sizeof(short)); + offset += sizeof(short); + memcpy(buffer+offset, &nummod, sizeof(short)); + offset += sizeof(short); + for( i= 0; i< numread; i++) { + headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidread[i]); + memcpy(buffer+offset, headeraddr, sizeof(objheader_t)); + offset += sizeof(objheader_t); + } + for( i= 0; i< nummod; i++) { + headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]); + memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]); + offset += sizeof(objheader_t) + classsize[headeraddr->type]; + } + if (offset > RECEIVE_BUFFER_SIZE) { + printf("Error: Buffersize too small"); + } + if (write(sd, buffer, sizeof(buffer)) < 0) { + perror("Error sending message"); + return 1; + } +#ifdef DEBUG1 + printf("DEBUG -> ready to rcv ...\n"); +#endif + read(sd, buffer, sizeof(buffer)); + close(sd); + printf("Server sent %d\n",buffer[0]); + /* + if (buffer[0] == TRANS_AGREE) { + //change machine pile + + } + */ + //Reset numread and nummod for the next pile + numread = nummod = 0; + tmp = tmp->next; + + } } + +#if 0 +int transCommit(transrecord_t *record){ + //Look through all the objects in the cache + int i,numelements,isFirst; + unsigned int size,machinenum;//Represents number of buckets + void *address; + objheader_t *headeraddr,localheaderaddr; + chashlistnode_t *curr, *ptr, *next; + int sd, size; + struct sockaddr_in serv_addr; + struct hostent *server; + char buffer[RECEIVE_BUFFER_SIZE],control; + + ptr = record->lookupTable->table; + size = record->lookupTable->size; + //Outer loop for chashtable + for(i = 0; i< size ;i++) { + curr = &ptr[i]; + //Inner look to traverse the linked list of the cache lookupTable + while(curr != NULL) { + if(curr->key == 0) { + break; + } + //Find if local or remote + address = mhashSearch(curr->key); + d + localheaderaddr = (objheader_t *) curr->value; + if(address != NULL) { + //Is local so check if the local copy has been updated + headeraddr = (objheader_t *) address; + if(localheaderaddr->version == headeraddr->version){ + //Lock Object + + } + else { + //vote as DISAGREE + //Start TransAbort(); + //Unlock object + } + } + else { + //Is remote + //Find which machine it belongs to + machinenum = lhashSearch(curr->key); + //Start TRANS_REQUEST to machine + + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket"); + return NULL; + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); + + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { + perror("Error in connect"); + return NULL; + } + bzero((char *)buffer,sizeof(buffer)); + control = READ_REQUEST; + buffer[0] = control; + memcpy(buffer+1, &oid, sizeof(int)); + if (write(sd, buffer, sizeof(buffer)) < 0) { + perror("Error sending message"); + return NULL; + } + +#ifdef DEBUG1 + printf("DEBUG -> ready to rcv ...\n"); +#endif + read(sd, buffer, sizeof(buffer)); + close(sd); + + } + next = curr->next; + } + curr = next; + } + +} +#endif + + int transAbort(transrecord_t *record){ } -- 2.34.1