X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=Robust%2Fsrc%2FRuntime%2FDSTM%2Finterface%2Ftrans.c;h=c4139fc0ee4718c391f8302c293d52ebe593b66c;hb=1e23ae339caa2709f28af2d22e6b34c2c0b01abb;hp=010735901d5283ca9e8924d969ffdaf227a72e9e;hpb=f7c4e59095efda12887fd8c5135c72ad79b7a687;p=IRC.git diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 01073590..c4139fc0 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -1,856 +1,1721 @@ #include "dstm.h" #include "ip.h" #include "clookup.h" +#include "machinepile.h" #include "mlookup.h" #include "llookup.h" #include "plookup.h" -#include -#include -#include -#include -#include -#include -#include -#include - -#define LISTEN_PORT 2156 -#define MACHINE_IP "127.0.0.1" -#define RECEIVE_BUFFER_SIZE 2048 +#include "prelookup.h" +#include "threadnotify.h" +#include "queue.h" +#include "addUdpEnhance.h" +#ifdef COMPILER +#include "thread.h" +#endif + +#define NUM_THREADS 1 +#define PREFETCH_CACHE_SIZE 1048576 //1MB +#define CONFIG_FILENAME "dstm.conf" +/* Global Variables */ extern int classsize[]; -objstr_t *mainobjstore; +objstr_t *prefetchcache; //Global Prefetch cache +pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache +pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */ +extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store +extern prehashtable_t pflookup; //Global Prefetch cache's lookup table +pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue +pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */ +extern objstr_t *mainobjstore; +unsigned int myIpAddr; +unsigned int *hostIpAddrs; +int sizeOfHostArray; +int numHostsInSystem; +int myIndexInHostArray; +unsigned int oidsPerBlock; +unsigned int oidMin; +unsigned int oidMax; + +sockPoolHashTable_t *transReadSockPool; +sockPoolHashTable_t *transPrefetchSockPool; +pthread_mutex_t notifymutex; +pthread_mutex_t atomicObjLock; + +/*********************************** + * Global Variables for statistics + **********************************/ +extern int numTransCommit; +extern int numTransAbort; + +void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); -/* This functions inserts randowm wait delays in the order of msec */ -void randomdelay(void) -{ - struct timespec req, rem; - time_t t; +/******************************* + * Send and Recv function calls + *******************************/ +void send_data(int fd , void *buf, int buflen) { + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = send(fd, buffer, size, MSG_NOSIGNAL); + if (numbytes == -1) { + perror("send"); + exit(-1); + } + buffer += numbytes; + size -= numbytes; + } +} - t = time(NULL); - req.tv_sec = 0; - req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec - nanosleep(&req, &rem); - return; +void recv_data(int fd , void *buf, int buflen) { + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = recv(fd, buffer, size, 0); + if (numbytes == -1) { + perror("recv"); + exit(-1); + } + buffer += numbytes; + size -= numbytes; + } } -transrecord_t *transStart() -{ - transrecord_t *tmp = malloc(sizeof(transrecord_t)); - tmp->cache = objstrCreate(1048576); - tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); - return tmp; +int recv_data_errorcode(int fd , void *buf, int buflen) { + char *buffer = (char *)(buf); + int size = buflen; + int numbytes; + while (size > 0) { + numbytes = recv(fd, buffer, size, 0); + if (numbytes==0) + return 0; + if (numbytes == -1) { + return -1; + } + buffer += numbytes; + size -= numbytes; + } + return 1; +} + +void printhex(unsigned char *ptr, int numBytes) { + int i; + for (i = 0; i < numBytes; i++) { + if (ptr[i] < 16) + printf("0%x ", ptr[i]); + else + printf("%x ", ptr[i]); + } + printf("\n"); + return; +} + +inline int arrayLength(int *array) { + int i; + for(i=0 ;array[i] != -1; i++) + ; + return i; +} + +inline int findmax(int *array, int arraylength) { + int max, i; + max = array[0]; + for(i = 0; i < arraylength; i++){ + if(array[i] > max) { + max = array[i]; + } + } + return max; +} + +/* This function is a prefetch call generated by the compiler that + * populates the shared primary prefetch queue*/ +void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { + /* Allocate for the queue node*/ + int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); + char * node= getmemory(qnodesize); + /* Set queue node values */ + int len; + int top=endoffsets[ntuples-1]; + + *((int *)(node))=ntuples; + len = sizeof(int); + memcpy(node+len, oids, ntuples*sizeof(unsigned int)); + memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short)); + memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short)); + + /* Lock and insert into primary prefetch queue */ + movehead(qnodesize); +} + +/* This function starts up the transaction runtime. */ +int dstmStartup(const char * option) { + pthread_t thread_Listen, udp_thread_Listen; + pthread_attr_t attr; + int master=option!=NULL && strcmp(option, "master")==0; + int fd; + int udpfd; + + if (processConfigFile() != 0) + return 0; //TODO: return error value, cause main program to exit +#ifdef COMPILER + if (!master) + threadcount--; +#endif + +#ifdef TRANSSTATS + printf("Trans stats is on\n"); + fflush(stdout); +#endif + + //Initialize socket pool + transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1); + transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1); + + dstmInit(); + transInit(); + + fd=startlistening(); + udpfd = udpInit(); + pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd); + if (master) { + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd); + return 1; + } else { + dstmListen((void *)fd); + return 0; + } +} + +//TODO Use this later +void *pCacheAlloc(objstr_t *store, unsigned int size) { + void *tmp; + objstr_t *ptr; + ptr = store; + int success = 0; + + while(ptr->next != NULL) { + /* check if store is empty */ + if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) { + tmp = ptr->top; + ptr->top += size; + success = 1; + return tmp; + } else { + ptr = ptr-> next; + } + } + + if(success == 0) { + return NULL; + } +} + +/* This function initiates the prefetch thread A queue is shared + * between the main thread of execution and the prefetch thread to + * process the prefetch call Call from compiler populates the shared + * queue with prefetch requests while prefetch thread processes the + * prefetch requests */ + +void transInit() { + int t, rc; + int retval; + //Create and initialize prefetch cache structure + prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); + + /* Initialize attributes for mutex */ + pthread_mutexattr_init(&prefetchcache_mutex_attr); + pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + + pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); + + pthread_mutex_init(¬ifymutex, NULL); + pthread_mutex_init(&atomicObjLock, NULL); + //Create prefetch cache lookup table + if(prehashCreate(HASH_SIZE, LOADFACTOR)) { + printf("ERROR\n"); + return; //Failure + } + + //Initialize primary shared queue + queueInit(); + //Initialize machine pile w/prefetch oids and offsets shared queue + mcpileqInit(); + + //Create the primary prefetch thread + do { + retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + } while(retval!=0); + pthread_detach(tPrefetch); +} + +/* This function stops the threads spawned */ +void transExit() { + int t; + pthread_cancel(tPrefetch); + for(t = 0; t < NUM_THREADS; t++) + pthread_cancel(wthreads[t]); + + return; +} + +/* This functions inserts randowm wait delays in the order of msec + * Mostly used when transaction commits retry*/ +void randomdelay() { + struct timespec req; + time_t t; + + t = time(NULL); + req.tv_sec = 0; + req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec + nanosleep(&req, NULL); + return; +} + +/* This function initializes things required in the transaction start*/ +transrecord_t *transStart() { + transrecord_t *tmp; + if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){ + printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__); + return NULL; + } + tmp->cache = objstrCreate(1048576); + tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); +#ifdef COMPILER + tmp->revertlist=NULL; +#endif + return tmp; } /* This function finds the location of the objects involved in a transaction * and returns the pointer to the object if found in a remote location */ -objheader_t *transRead(transrecord_t *record, unsigned int oid) -{ - unsigned int machinenumber; - objheader_t *tmp, *objheader; - void *objcopy; - int size; - void *buf; - /* Search local cache */ - if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ - //LOCAL Object - objheader->status |= LOCAL; - //printf("DEBUG -> transRead oid %d found local\n", oid); - return(objheader); - } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { - /* Look up in machine lookup table and copy into cache*/ - //printf("oid is found in Local machinelookup\n"); - tmp = mhashSearch(oid); - size = sizeof(objheader_t)+classsize[tmp->type]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); - //LOCAL Object - ((objheader_t *) objcopy)->status |= LOCAL; - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, objheader->oid, objcopy); - return(objcopy); - } else { /* If not found in machine look up */ - /* Get the object from the remote location */ - machinenumber = lhashSearch(oid); - objcopy = getRemoteObj(record, machinenumber, oid); - if(objcopy == NULL) { - //If object is not found in Remote location - //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber); - return NULL; - } - else { - //printf("Object oid = %d found in Machine %d\n", oid, machinenumber); - return(objcopy); - } - } +objheader_t *transRead(transrecord_t *record, unsigned int oid) { + unsigned int machinenumber; + objheader_t *tmp, *objheader; + objheader_t *objcopy; + int size, found = 0; + void *buf; + + if(oid == 0) { + return NULL; + } + + if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ + /* Search local transaction cache */ +#ifdef COMPILER + return &objheader[1]; +#else + return objheader; +#endif + } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { + /* Look up in machine lookup table and copy into cache*/ + GETSIZE(size, objheader); + size += sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, objheader, size); + /* Insert into cache's lookup table */ + STATUS(objcopy)=0; + chashInsert(record->lookupTable, OID(objheader), objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { + /* Look up in prefetch cache */ + GETSIZE(size, tmp); + size+=sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, tmp, size); + /* Insert into cache's lookup table */ + chashInsert(record->lookupTable, OID(tmp), objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } else { + /* Get the object from the remote location */ + if((machinenumber = lhashSearch(oid)) == 0) { + printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__); + return NULL; + } + objcopy = getRemoteObj(record, machinenumber, oid); + + if(objcopy == NULL) { + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); + return NULL; + } else { + STATUS(objcopy)=0; +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } + } } + /* This function creates objects in the transaction record */ -objheader_t *transCreateObj(transrecord_t *record, unsigned short type) -{ - objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type])); - tmp->oid = getNewOID(); - tmp->type = type; - tmp->version = 1; - tmp->rcount = 0; //? not sure how to handle this yet - tmp->status = 0; - tmp->status |= NEW; - chashInsert(record->lookupTable, tmp->oid, tmp); - return tmp; +objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { + objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); + OID(tmp) = getNewOID(); + tmp->version = 1; + tmp->rcount = 1; + STATUS(tmp) = NEW; + chashInsert(record->lookupTable, OID(tmp), tmp); + +#ifdef COMPILER + return &tmp[1]; //want space after object header +#else + return tmp; +#endif } + /* This function creates machine piles based on all machines involved in a * transaction commit request */ plistnode_t *createPiles(transrecord_t *record) { - int i = 0; - unsigned int size;/* Represents number of bins in the chash table */ - chashlistnode_t *curr, *ptr, *next; - plistnode_t *pile = NULL; - unsigned int machinenum; - objheader_t *headeraddr; - - ptr = record->lookupTable->table; - size = record->lookupTable->size; - - for(i = 0; i < size ; i++) { - curr = &ptr[i]; - /* Inner loop to traverse the linked list of the cache lookupTable */ - while(curr != NULL) { - //if the first bin in hash table is empty - if(curr->key == 0) { - break; - } - next = curr->next; - //Get machine location for object id - - if ((machinenum = lhashSearch(curr->key)) == 0) { - printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); - return NULL; - } - - if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { - printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); - return NULL; - } - //Make machine groups - if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) { - printf("pInsert error %s, %d\n", __FILE__, __LINE__); - return NULL; - } - /* Check if local */ - if((headeraddr->status & LOCAL) == LOCAL) { - pile->local = 1; //True i.e. local - } - curr = next; - } - } - - return pile; + int i; + plistnode_t *pile = NULL; + unsigned int machinenum; + objheader_t *headeraddr; + chashlistnode_t * ptr = record->lookupTable->table; + /* Represents number of bins in the chash table */ + unsigned int size = record->lookupTable->size; + + for(i = 0; i < size ; i++) { + chashlistnode_t * curr = &ptr[i]; + /* Inner loop to traverse the linked list of the cache lookupTable */ + while(curr != NULL) { + //if the first bin in hash table is empty + if(curr->key == 0) + break; + + if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) { + printf("Error: No such oid %s, %d\n", __FILE__, __LINE__); + return NULL; + } + + //Get machine location for object id (and whether local or not) + if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { + machinenum = myIpAddr; + } else if ((machinenum = lhashSearch(curr->key)) == 0) { + printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); + return NULL; + } + + //Make machine groups + pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements); + curr = curr->next; + } + } + return pile; } + /* This function initiates the transaction commit process * Spawns threads for each of the new connections with Participants - * and creates new piles by calling the createPiles(), - * Fills the piles with necesaary information and - * Sends a transrequest() to each pile*/ + * and creates new piles by calling the createPiles(), + * Sends a transrequest() to each remote machines for objects found remotely + * and calls handleLocalReq() to process objects found locally */ int transCommit(transrecord_t *record) { - unsigned int tot_bytes_mod, *listmid; - plistnode_t *pile; - int i, rc, val; - int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0; - char buffer[RECEIVE_BUFFER_SIZE],control; - char transid[TID_LEN]; - trans_req_data_t *tosend; - trans_commit_data_t transinfo; - static int newtid = 0; - char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ - char localstat = 0; - - /* Look through all the objects in the transaction record and make piles - * for each machine involved in the transaction*/ - pile = createPiles(record); - - /* Create the packet to be sent in TRANS_REQUEST */ - - /* Count the number of participants */ - pilecount = pCount(pile); - - /* Create a list of machine ids(Participants) involved in transaction */ - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - pListMid(pile, listmid); - - - /* Initialize thread variables, - * Spawn a thread for each Participant involved in a transaction */ - pthread_t thread[pilecount]; - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; - - thread_data_array_t *thread_data_array; - thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); - local_thread_data_array_t *ltdata; - if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; + unsigned int tot_bytes_mod, *listmid; + plistnode_t *pile, *pile_ptr; + int i, j, rc, val; + int pilecount, offset, threadnum, trecvcount; + char control; + char transid[TID_LEN]; + trans_req_data_t *tosend; + trans_commit_data_t transinfo; + static int newtid = 0; + char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */ + thread_data_array_t *thread_data_array; + local_thread_data_array_t *ltdata; + int firsttime=1; + + do { + treplyctrl=0; + trecvcount = 0; + threadnum = 0; + treplyretry = 0; + thread_data_array = NULL; + ltdata = NULL; + + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + if (firsttime) + pile_ptr = pile = createPiles(record); + else + pile=pile_ptr; + firsttime=0; + + /* Create the packet to be sent in TRANS_REQUEST */ + + /* Count the number of participants */ + pilecount = pCount(pile); + + /* Create a list of machine ids(Participants) involved in transaction */ + listmid = calloc(pilecount, sizeof(unsigned int)); + pListMid(pile, listmid); + + + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; + + thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t)); + + ltdata = calloc(1, sizeof(local_thread_data_array_t)); + + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ + + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + tosend = calloc(1, sizeof(trans_req_data_t)); + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.numcreated = pile->numcreated; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + tosend->oidcreated = pile->oidcreated; + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->mid != myIpAddr) { /* Not local */ + do { + rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); + } while(rc!=0); + if(rc) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; } - - thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - - /* Initialize and set thread detach attribute */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); - - /* Process each machine pile */ - while(pile != NULL) { - //Create transaction id - newtid++; - //trans_req_data_t *tosend; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - printf("DEBUG-> pile numread = %d\n", pile->numread); - tosend->f.nummod = pile->nummod; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].pilecount = pilecount; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - //thread_data_array[threadnum].localstatus = &localstat; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->local != 1) { - rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); - if (rc) { - perror("Error in pthread create\n"); - return 1; - } - } else { - /*Unset the pile->local flag*/ - pile->local = 0; - //header->status &= ~(LOCK); - /*Handle request of local pile */ - /*Set flag to identify that Local machine is involved*/ - ltdata->tdata = &thread_data_array[threadnum]; - printf("DEBUG->Address of ltdata sent = %x\n", <data); - ltdata->transinfo = &transinfo; - printf("DEBUG-> Machine Pile numread = %d\n", ltdata->tdata->buffer->f.numread); - val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) <data); - if (val) { - perror("Error in pthread create\n"); - return 1; - } - } - threadnum++; - pile = pile->next; + } else { /*Local*/ + ltdata->tdata = &thread_data_array[threadnum]; + ltdata->transinfo = &transinfo; + do { + val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); + } while(val!=0); + if(val) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; } - - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); - for (i = 0 ;i < pilecount ; i++) { - rc = pthread_join(thread[i], NULL); - if (rc) - { - printf("ERROR return code from pthread_join() is %d\n", rc); - return 1; - } + } + + threadnum++; + pile = pile->next; + } + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + + for (i = 0; i < threadnum; i++) { + rc = pthread_join(thread[i], NULL); + if(rc) + { + printf("Error: return code from pthread_join() is %d\n", rc); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (j = i; j < threadnum; j++) { + free(thread_data_array[j].buffer); + } + return 1; } - - /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(tosend); - free(listmid); - pDelete(pile); - free(thread_data_array); - free(ltdata); - - /* Retry trans commit procedure if not sucessful in the first try */ - if(treplyretry == 1) { - /* wait a random amount of time */ - randomdelay(); - //sleep(1); - /* Retry the commiting transaction again */ - transCommit(record); - } - - return 0; + free(thread_data_array[i].buffer); + } + + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(listmid); + + if (!treplyretry) + pDelete(pile_ptr); + + /* wait a random amount of time before retrying to commit transaction*/ + if(treplyretry) { + free(thread_data_array); + free(ltdata); + randomdelay(); + } + + /* Retry trans commit procedure during soft_abort case */ + } while (treplyretry); + + if(treplyctrl == TRANS_ABORT) { +#ifdef TRANSSTATS + ++numTransAbort; +#endif + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return TRANS_ABORT; + } else if(treplyctrl == TRANS_COMMIT) { +#ifdef TRANSSTATS + ++numTransCommit; +#endif + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return 0; + } else { + //TODO Add other cases + printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__); + exit(-1); + } + return 0; } -/* This function sends information involved in the transaction request and - * accepts a response from particpants. +/* This function sends information involved in the transaction request + * to participants and accepts a response from particpants. * It calls decideresponse() to decide on what control message - * to send next and sends the message using sendResponse()*/ + * to send next to participants and sends the message using sendResponse()*/ void *transRequest(void *threadarg) { - int sd, i, n; - struct sockaddr_in serv_addr; - struct hostent *server; - thread_data_array_t *tdata; - objheader_t *headeraddr; - char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; - char machineip[16], retval; - - tdata = (thread_data_array_t *) threadarg; - - /* Send Trans Request */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket for TRANS_REQUEST\n"); - return NULL; + int sd, i, n; + struct sockaddr_in serv_addr; + thread_data_array_t *tdata; + objheader_t *headeraddr; + char control, recvcontrol; + char machineip[16], retval; + + tdata = (thread_data_array_t *) threadarg; + + /* Send Trans Request */ + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("Error in socket for TRANS_REQUEST\n"); + pthread_exit(NULL); + } + bzero((char*) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(LISTEN_PORT); + serv_addr.sin_addr.s_addr = htonl(tdata->mid); + + /* Open Connection */ + if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + perror("Error in connect for TRANS_REQUEST\n"); + close(sd); + pthread_exit(NULL); + } + + /* Send bytes of data with TRANS_REQUEST control message */ + send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t)); + + /* Send list of machines involved in the transaction */ + { + int size=sizeof(unsigned int)*tdata->buffer->f.mcount; + send_data(sd, tdata->buffer->listmid, size); + } + + /* Send oids and version number tuples for objects that are read */ + { + int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread; + send_data(sd, tdata->buffer->objread, size); + } + + /* Send objects that are modified */ + for(i = 0; i < tdata->buffer->f.nummod ; i++) { + int size; + headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); + GETSIZE(size,headeraddr); + size+=sizeof(objheader_t); + send_data(sd, headeraddr, size); + } + + /* Read control message from Participant */ + recv_data(sd, &control, sizeof(char)); + /* Recv Objects if participant sends TRANS_DISAGREE */ + if(control == TRANS_DISAGREE) { + int length; + recv_data(sd, &length, sizeof(int)); + void *newAddr; + pthread_mutex_lock(&prefetchcache_mutex); + if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) { + printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + close(sd); + pthread_exit(NULL); + } + pthread_mutex_unlock(&prefetchcache_mutex); + recv_data(sd, newAddr, length); + int offset = 0; + while(length != 0) { + unsigned int oidToPrefetch; + objheader_t * header; + header = (objheader_t *) (((char *)newAddr) + offset); + oidToPrefetch = OID(header); + int size = 0; + GETSIZE(size, header); + size += sizeof(objheader_t); + //make an entry in prefetch hash table + void *oldptr; + if((oldptr = prehashSearch(oidToPrefetch)) != NULL) { + prehashRemove(oidToPrefetch); + prehashInsert(oidToPrefetch, header); + } else { + prehashInsert(oidToPrefetch, header); + } + length = length - size; + offset += size; + } + } + recvcontrol = control; + /* Update common data structure and increment count */ + tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; + + /* Lock and update count */ + /* Thread sleeps until all messages from pariticipants are received by coordinator */ + pthread_mutex_lock(tdata->lock); + + (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ + + /* Wake up the threads and invoke decideResponse (once) */ + if(*(tdata->count) == tdata->buffer->f.mcount) { + decideResponse(tdata); + pthread_cond_broadcast(tdata->threshold); + } else { + pthread_cond_wait(tdata->threshold, tdata->lock); + } + pthread_mutex_unlock(tdata->lock); + + /* Send the final response such as TRANS_COMMIT or TRANS_ABORT + * to all participants in their respective socket */ + if (sendResponse(tdata, sd) == 0) { + printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); + close(sd); + pthread_exit(NULL); + } + + recv_data((int)sd, &control, sizeof(char)); + + if(control == TRANS_UNSUCESSFUL) { + //printf("DEBUG-> TRANS_ABORTED\n"); + } else if(control == TRANS_SUCESSFUL) { + //printf("DEBUG-> TRANS_SUCCESSFUL\n"); + } else { + //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); + } + + /* Close connection */ + close(sd); + pthread_exit(NULL); +} + +/* This function decides the reponse that needs to be sent to + * all Participant machines after the TRANS_REQUEST protocol */ +void decideResponse(thread_data_array_t *tdata) { + char control; + int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what + message to send */ + + for (i = 0 ; i < tdata->buffer->f.mcount; i++) { + control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses + written onto the shared array */ + switch(control) { + default: + printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + /* treat as disagree, pass thru */ + case TRANS_DISAGREE: + transdisagree++; + break; + + case TRANS_AGREE: + transagree++; + break; + + case TRANS_SOFT_ABORT: + transsoftabort++; + break; + } + } + + if(transdisagree > 0) { + /* Send Abort */ + *(tdata->replyctrl) = TRANS_ABORT; + *(tdata->replyretry) = 0; + /* clear objects from prefetch cache */ + for (i = 0; i < tdata->buffer->f.numread; i++) { + prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); + } + for (i = 0; i < tdata->buffer->f.nummod; i++) { + prehashRemove(tdata->buffer->oidmod[i]); + } + } else if(transagree == tdata->buffer->f.mcount){ + /* Send Commit */ + *(tdata->replyctrl) = TRANS_COMMIT; + *(tdata->replyretry) = 0; + /* update prefetch cache */ + /* For objects read */ + char oidType; + int retval; + oidType = 'R'; + if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return; + } + oidType = 'M'; + if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) { + printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return; + } + /* Invalidate objects in other machine cache */ + if(tdata->buffer->f.nummod > 0) { + if((retval = invalidateObj(tdata)) != 0) { + printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__); + return; + } + } + } else { + /* Send Abort in soft abort case followed by retry commiting transaction again*/ + *(tdata->replyctrl) = TRANS_ABORT; + *(tdata->replyretry) = 1; + } + return; +} + +/* This function updates the prefetch cache when commiting objects + * based on the type of oid i.e. if oid is read or oid is modified + * Return -1 on error else returns 0 + */ +int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) { + int i; + for (i = 0; i < numoid; i++) { + //find address object + objheader_t *header, *newAddr; + int size; + unsigned int oid; + if(oidType == 'R') { + oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); + } else { + oid = tdata->buffer->oidmod[i]; + } + pthread_mutex_lock(&prefetchcache_mutex); + header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid); + //copy object into prefetch cache + GETSIZE(size, header); + if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) { + printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(newAddr, header, (size + sizeof(objheader_t))); + //make an entry in prefetch hash table + void *oldptr; + if((oldptr = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + prehashInsert(oid, newAddr); + } else { + prehashInsert(oid, newAddr); + } + } + return 0; +} + + +/* This function sends the final response to remote machines per + * thread in their respective socket id It returns a char that is only + * needed to check the correctness of execution of this function + * inside transRequest()*/ + +char sendResponse(thread_data_array_t *tdata, int sd) { + int n, size, sum, oidcount = 0, control; + char *ptr, retval = 0; + unsigned int *oidnotfound; + + control = *(tdata->replyctrl); + send_data(sd, &control, sizeof(char)); + + //TODO read missing objects during object migration + /* If response is a soft abort due to missing objects at the + Participant's side */ + + /* If the decided response is TRANS_ABORT */ + if(*(tdata->replyctrl) == TRANS_ABORT) { + retval = TRANS_ABORT; + } else if(*(tdata->replyctrl) == TRANS_COMMIT) { + /* If the decided response is TRANS_COMMIT */ + retval = TRANS_COMMIT; + } + + return retval; +} + +/* This function opens a connection, places an object read request to + * the remote machine, reads the control message and object if + * available and copies the object and its header to the local + * cache. */ + +void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { + int size, val; + struct sockaddr_in serv_addr; + char machineip[16]; + char control; + objheader_t *h; + void *objcopy = NULL; + + int sd = getSock2(transReadSockPool, mnum); + char readrequest[sizeof(char)+sizeof(unsigned int)]; + readrequest[0] = READ_REQUEST; + *((unsigned int *)(&readrequest[1])) = oid; + send_data(sd, readrequest, sizeof(readrequest)); + + /* Read response from the Participant */ + recv_data(sd, &control, sizeof(char)); + + if (control==OBJECT_NOT_FOUND) { + objcopy = NULL; + } else { + /* Read object if found into local cache */ + recv_data(sd, &size, sizeof(int)); + objcopy = objstrAlloc(record->cache, size); + recv_data(sd, objcopy, size); + /* Insert into cache's lookup table */ + chashInsert(record->lookupTable, oid, objcopy); + } + + return objcopy; +} + +/* This function handles the local objects involved in a transaction + * commiting process. It also makes a decision if this local machine + * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note + * Coordinator = local machine It wakes up the other threads from + * remote participants that are waiting for the coordinator's decision + * and based on common agreement it either commits or aborts the + * transaction. It also frees the memory resources */ + +void *handleLocalReq(void *threadarg) { + unsigned int *oidnotfound = NULL, *oidlocked = NULL; + local_thread_data_array_t *localtdata; + int numoidnotfound = 0, numoidlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int numread, i; + unsigned int oid; + unsigned short version; + void *mobj; + objheader_t *headptr; + + localtdata = (local_thread_data_array_t *) threadarg; + + /* Counters and arrays to formulate decision on control message to be sent */ + oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); + + numread = localtdata->tdata->buffer->f.numread; + /* Process each oid in the machine pile/ group per thread */ + for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { + if (i < localtdata->tdata->buffer->f.numread) { + int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array + incr *= i; + oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); + version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); + } else { // Objects Modified + int tmpsize; + headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); + if (headptr == NULL) { + printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__); + return NULL; + } + oid = OID(headptr); + version = headptr->version; + } + /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ + + /* Save the oids not found and number of oids not found for later use */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ + /* Save the oids not found and number of oids not found for later use */ + oidnotfound[numoidnotfound] = oid; + numoidnotfound++; + } else { /* If Obj found in machine (i.e. has not moved) */ + /* Check if Obj is locked by any previous transaction */ + if (test_and_set(STATUSPTR(mobj))) { + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ + v_matchlock++; + } else {/* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + } + } else { + //we're locked + /* Save all object oids that are locked on this machine during this transaction request call */ + oidlocked[numoidlocked] = OID(((objheader_t *)mobj)); + numoidlocked++; + if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ + v_matchnolock++; + } else { /* If versions don't match ...HARD ABORT */ + v_nomatch++; + /* Send TRANS_DISAGREE to Coordinator */ + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; + } + } + } + } // End for + /* Condition to send TRANS_AGREE */ + if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; + } + /* Condition to send TRANS_SOFT_ABORT */ + if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) { + localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; + } + + /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process + * if Participant receives a TRANS_COMMIT */ + localtdata->transinfo->objlocked = oidlocked; + localtdata->transinfo->objnotfound = oidnotfound; + localtdata->transinfo->modptr = NULL; + localtdata->transinfo->numlocked = numoidlocked; + localtdata->transinfo->numnotfound = numoidnotfound; + /* Lock and update count */ + //Thread sleeps until all messages from pariticipants are received by coordinator + pthread_mutex_lock(localtdata->tdata->lock); + (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ + + /* Wake up the threads and invoke decideResponse (once) */ + if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { + decideResponse(localtdata->tdata); + pthread_cond_broadcast(localtdata->tdata->threshold); + } else { + pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); + } + pthread_mutex_unlock(localtdata->tdata->lock); + if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ + if(transAbortProcess(localtdata) != 0) { + printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { + if(transComProcess(localtdata) != 0) { + printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + } + /* Free memory */ + if (localtdata->transinfo->objlocked != NULL) { + free(localtdata->transinfo->objlocked); + } + if (localtdata->transinfo->objnotfound != NULL) { + free(localtdata->transinfo->objnotfound); + } + + pthread_exit(NULL); +} + +/* This function completes the ABORT process if the transaction is aborting */ +int transAbortProcess(local_thread_data_array_t *localtdata) { + int i, numlocked; + unsigned int *objlocked; + void *header; + + numlocked = localtdata->transinfo->numlocked; + objlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < numlocked; i++) { + if((header = mhashSearch(objlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + UnLock(STATUSPTR(header)); + } + + return 0; +} + +/*This function completes the COMMIT process is the transaction is commiting*/ +int transComProcess(local_thread_data_array_t *localtdata) { + objheader_t *header, *tcptr; + int i, nummod, tmpsize, numcreated, numlocked; + unsigned int *oidmod, *oidcreated, *oidlocked; + void *ptrcreate; + + nummod = localtdata->tdata->buffer->f.nummod; + oidmod = localtdata->tdata->buffer->oidmod; + numcreated = localtdata->tdata->buffer->f.numcreated; + oidcreated = localtdata->tdata->buffer->oidcreated; + numlocked = localtdata->transinfo->numlocked; + oidlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < nummod; i++) { + if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { + printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + /* Copy from transaction cache -> main object store */ + if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize, header); + char *tmptcptr = (char *) tcptr; + memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize); + header->version += 1; + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + } + /* If object is newly created inside transaction then commit it */ + for (i = 0; i < numcreated; i++) { + if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize, header); + tmpsize += sizeof(objheader_t); + pthread_mutex_lock(&mainobjstore_mutex); + if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { + printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mainobjstore_mutex); + return 1; + } + pthread_mutex_unlock(&mainobjstore_mutex); + memcpy(ptrcreate, header, tmpsize); + mhashInsert(oidcreated[i], ptrcreate); + lhashInsert(oidcreated[i], myIpAddr); + } + /* Unlock locked objects */ + for(i = 0; i < numlocked; i++) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { + printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + UnLock(STATUSPTR(header)); + } + + return 0; +} + +prefetchpile_t *foundLocal(char *ptr) { + int ntuples = *(GET_NTUPLES(ptr)); + unsigned int * oidarray = GET_PTR_OID(ptr); + unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples); + short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + prefetchpile_t * head=NULL; + + int i; + for(i=0;i NUMCLASSES) { + int elementsize = classsize[TYPE(header)]; + struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t)); + int length = ao->___length___; + /* Check if array out of bounds */ + if(offset < 0 || offset >= length) { + //if yes treat the object as found + (*oid)=0; + return 1; + } + (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset))); + return 1; + } else { + (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset)); + return 1; + } +} + + +/* This function is called by the thread calling transPrefetch */ +void *transPrefetch(void *t) { + while(1) { + /* lock mutex of primary prefetch queue */ + void *node=gettail(); + /* Check if the tuples are found locally, if yes then reduce them further*/ + /* and group requests by remote machine ids by calling the makePreGroups() */ + prefetchpile_t *pilehead = foundLocal(node); + + if (pilehead!=NULL) { + // Get sock from shared pool + int sd = getSock2(transPrefetchSockPool, pilehead->mid); + + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } + + /* Release socket */ + // freeSock(transPrefetchSockPool, pilehead->mid, sd); + + /* Deallocated pilehead */ + mcdealloc(pilehead); + } + // Deallocate the prefetch queue pile node + inctail(); + } +} + +void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) { + objpile_t *tmp; + + int size=sizeof(char)+sizeof(int); + for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) { + size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + } + + char buft[size]; + char *buf=buft; + *buf=TRANS_PREFETCH; + buf+=sizeof(char); + + for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) { + int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + *((int*)buf)=len; + buf+=sizeof(int); + *((unsigned int *)buf)=tmp->oid; + buf+=sizeof(unsigned int); + *((unsigned int *)(buf)) = myIpAddr; + buf+=sizeof(unsigned int); + memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short)); + buf+=tmp->numoffset*sizeof(short); + } + *((int *)buf)=-1; + send_data(sd, buft, size); + return; +} + +void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { + int len, endpair; + char control; + objpile_t *tmp; + + /* Send TRANS_PREFETCH control message */ + control = TRANS_PREFETCH; + send_data(sd, &control, sizeof(char)); + + /* Send Oids and offsets in pairs */ + tmp = mcpilenode->objpiles; + while(tmp != NULL) { + len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + char oidnoffset[len]; + char *buf=oidnoffset; + *((int*)buf) = tmp->numoffset; + buf+=sizeof(int); + *((unsigned int *)buf) = tmp->oid; + buf+=sizeof(unsigned int); + *((unsigned int *)buf) = myIpAddr; + buf += sizeof(unsigned int); + memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short)); + send_data(sd, oidnoffset, len); + tmp = tmp->next; + } + + /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */ + endpair = -1; + send_data(sd, &endpair, sizeof(int)); + + return; +} + +int getPrefetchResponse(int sd) { + int length = 0, size = 0; + char control; + unsigned int oid; + void *modptr, *oldptr; + + recv_data((int)sd, &length, sizeof(int)); + size = length - sizeof(int); + char recvbuffer[size]; + + recv_data((int)sd, recvbuffer, size); + control = *((char *) recvbuffer); + if(control == OBJECT_FOUND) { + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + //printf("oid %d found\n",oid); + size = size - (sizeof(char) + sizeof(unsigned int)); + pthread_mutex_lock(&prefetchcache_mutex); + if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) { + printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + return -1; + } + pthread_mutex_unlock(&prefetchcache_mutex); + memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size); + STATUS(modptr)=0; + + /* Insert the oid and its address into the prefetch hash lookup table */ + /* Do a version comparison if the oid exists */ + if((oldptr = prehashSearch(oid)) != NULL) { + /* If older version then update with new object ptr */ + if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) { + prehashRemove(oid); + prehashInsert(oid, modptr); + } + } else {/* Else add the object ptr to hash table*/ + prehashInsert(oid, modptr); + } + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); + /* Broadcast signal on prefetch cache condition variable */ + pthread_cond_broadcast(&pflookup.cond); + /* Unlock the Prefetch Cache look up table*/ + pthread_mutex_unlock(&pflookup.lock); + } else if(control == OBJECT_NOT_FOUND) { + oid = *((unsigned int *)(recvbuffer + sizeof(char))); + /* TODO: For each object not found query DHT for new location and retrieve the object */ + /* Throw an error */ + printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid); + // exit(-1); + } else { + printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__); + } + + return 0; +} + +unsigned short getObjType(unsigned int oid) { + objheader_t *objheader; + unsigned short numoffset[] ={0}; + short fieldoffset[] ={}; + + if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) { + if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { + prefetch(1, &oid, numoffset, fieldoffset); + pthread_mutex_lock(&pflookup.lock); + while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { + pthread_cond_wait(&pflookup.cond, &pflookup.lock); } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - midtoIP(tdata->mid,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); - /* Open Connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for TRANS_REQUEST\n"); - return NULL; + pthread_mutex_unlock(&pflookup.lock); + } + } + + return TYPE(objheader); +} + +int startRemoteThread(unsigned int oid, unsigned int mid) +{ + int sock; + struct sockaddr_in remoteAddr; + char msg[1 + sizeof(unsigned int)]; + int bytesSent; + int status; + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + perror("startRemoteThread():socket()"); + return -1; } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); - printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); - /* Send bytes of data with TRANS_REQUEST control message */ - if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { - perror("Error sending fixed bytes for thread\n"); - return NULL; - } - /* Send list of machines involved in the transaction */ + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { - int size=sizeof(unsigned int)*tdata->pilecount; - if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { - perror("Error sending list of machines for thread\n"); - return NULL; - } + printf("startRemoteThread():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; } - /* Send oids and version number tuples for objects that are read */ + else { - int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; - if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { - perror("Error sending tuples for thread\n"); - return NULL; - } - } - /* Send objects that are modified */ - for(i = 0; i < tdata->buffer->f.nummod ; i++) { - int size; - headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); - size=sizeof(objheader_t)+classsize[headeraddr->type]; - if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { - perror("Error sending obj modified for thread\n"); - return NULL; - } + msg[0] = START_REMOTE_THREAD; + *((unsigned int *) &msg[1]) = oid; + send_data(sock, msg, 1 + sizeof(unsigned int)); } - /* Read control message from Participant */ - if((n = read(sd, &control, sizeof(char))) <= 0) { - perror("Error in reading control message from Participant\n"); - return NULL; - } - recvcontrol = control; - - /* Update common data structure and increment count */ - tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol; - - /* Lock and update count */ - //Thread sleeps until all messages from pariticipants are received by coordinator - pthread_mutex_lock(tdata->lock); - - (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ + close(sock); + return status; +} - /* Wake up the threads and invoke decideResponse (once) */ -/* - if((*(tdata->localstatus) & LM_EXISTS) == LM_EXISTS) { //If there is a local machine involved in the transaction - if(*(tdata->count) == tdata->pilecount - 1) { - while(*(tdata->localstatus) & LM_UPDATED != LM_UPDATED) { - ;//Do nothing and wait until Local machine thread updates the common data structure - } - if(decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - return NULL; - } - pthread_cond_broadcast(tdata->threshold); - } - } else if ((*(tdata->localstatus) & LM_EXISTS) == 0) { //No local m/c involved in transaction - if(*(tdata->count) == tdata->pilecount) { - if (decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - close(sd); - return NULL; - } - pthread_cond_broadcast(tdata->threshold); - } else { - pthread_cond_wait(tdata->threshold, tdata->lock); - } - } -*/ - - if(*(tdata->count) == tdata->pilecount) { - if (decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - close(sd); - return NULL; - } - pthread_cond_broadcast(tdata->threshold); - } else { - pthread_cond_wait(tdata->threshold, tdata->lock); - } - pthread_mutex_unlock(tdata->lock); - - /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t - * to all participants in their respective socket */ - if (sendResponse(tdata, sd) == 0) { - printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - close(sd); - return NULL; +//TODO: when reusing oids, make sure they are not already in use! +static unsigned int id = 0xFFFFFFFF; +unsigned int getNewOID(void) { + id += 2; + if (id > oidMax || id < oidMin) + { + id = (oidMin | 1); } - - /* Close connection */ - close(sd); - pthread_exit(NULL); + return id; } -/* This function decides the reponse that needs to be sent to - * all Participant machines involved in the transaction commit */ -int decideResponse(thread_data_array_t *tdata) { - char control; - int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what - message to send */ - - //Check common data structure - for (i = 0 ; i < tdata->pilecount ; i++) { - /*Switch on response from Participant */ - control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses - written onto the shared array */ - switch(control) { - case TRANS_DISAGREE: - printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n"); - transdisagree++; - break; - - case TRANS_AGREE: - printf("DEBUG-> trans.c Recv TRANS_AGREE\n"); - transagree++; - break; - - case TRANS_SOFT_ABORT: - printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n"); - transsoftabort++; - break; - default: - printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); - return -1; - } - } +int processConfigFile() +{ + FILE *configFile; + const int maxLineLength = 200; + char lineBuffer[maxLineLength]; + char *token; + const char *delimiters = " \t\n"; + char *commentBegin; + in_addr_t tmpAddr; - /* Decide what control message to send to Participant */ - if(transdisagree > 0) { - /* Send Abort */ - *(tdata->replyctrl) = TRANS_ABORT; - printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transagree == tdata->pilecount){ - /* Send Commit */ - *(tdata->replyctrl) = TRANS_COMMIT; - printf("DEBUG-> trans.c Sending TRANS_COMMIT\n"); - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transsoftabort > 0 && transdisagree == 0) { - /* Send Abort in soft abort case followed by retry commiting transaction again*/ - *(tdata->replyctrl) = TRANS_ABORT; - *(tdata->replyretry) = 1; - printf("DEBUG-> trans.c Sending TRANS_ABORT\n"); - } else { - printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__); + configFile = fopen(CONFIG_FILENAME, "r"); + if (configFile == NULL) + { + printf("error opening %s:\n", CONFIG_FILENAME); + perror(""); return -1; } + + numHostsInSystem = 0; + sizeOfHostArray = 8; + hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int)); - return 0; -} -/* This function sends the final response to all threads in their respective socket id */ -char sendResponse(thread_data_array_t *tdata, int sd) { - int n, N, sum, oidcount = 0; - char *ptr, retval = 0; - unsigned int *oidnotfound; - - /* If the decided response is due to a soft abort and missing objects at the Participant's side */ - if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { - /* Read list of objects missing */ - if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { - N = oidcount * sizeof(unsigned int); - if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); + while(fgets(lineBuffer, maxLineLength, configFile) != NULL) + { + commentBegin = strchr(lineBuffer, '#'); + if (commentBegin != NULL) + *commentBegin = '\0'; + token = strtok(lineBuffer, delimiters); + while (token != NULL) + { + tmpAddr = inet_addr(token); + if ((int)tmpAddr == -1) + { + printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token); + fclose(configFile); + return -1; } - ptr = (char *) oidnotfound; - do { - n = read(sd, ptr+sum, N-sum); - sum += n; - } while(sum < N && n !=0); + else + addHost(htonl(tmpAddr)); + token = strtok(NULL, delimiters); } - retval = TRANS_SOFT_ABORT; - } - /* If the decided response is TRANS_ABORT */ - if(*(tdata->replyctrl) == TRANS_ABORT) { - retval = TRANS_ABORT; - } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ - retval = TRANS_COMMIT; } - /* Send response to the Participant */ - if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - } - - return retval; -} -/* This function opens a connection, places an object read request to the - * remote machine, reads the control message and object if available and - * copies the object and its header to the local cache. - * TODO replace mnum and midtoIP() with MACHINE_IP address later */ - -void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { - int sd, size, val; - struct sockaddr_in serv_addr; - struct hostent *server; - char control; - char machineip[16]; - objheader_t *h; - void *objcopy; - - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket\n"); - return NULL; - } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); - midtoIP(mnum,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); - /* Open connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect\n"); - return NULL; - } - char readrequest[sizeof(char)+sizeof(unsigned int)]; - readrequest[0] = READ_REQUEST; - *((unsigned int *)(&readrequest[1])) = oid; - if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { - perror("Error sending message\n"); - return NULL; + fclose(configFile); + + if (numHostsInSystem < 1) + { + printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME); + return -1; } - -#ifdef DEBUG1 - printf("DEBUG -> ready to rcv ...\n"); +#ifdef MAC + myIpAddr = getMyIpAddr("en1"); +#else + myIpAddr = getMyIpAddr("eth0"); #endif - /* Read response from the Participant */ - if((val = read(sd, &control, sizeof(char))) <= 0) { - perror("No control response for getRemoteObj sent\n"); - return NULL; - } - switch(control) { - case OBJECT_NOT_FOUND: - printf("DEBUG -> Control OBJECT_NOT_FOUND received\n"); - return NULL; - case OBJECT_FOUND: - /* Read object if found into local cache */ - if((val = read(sd, &size, sizeof(int))) <= 0) { - perror("No size is read from the participant\n"); - return NULL; - } - objcopy = objstrAlloc(record->cache, size); - if((val = read(sd, objcopy, size)) <= 0) { - perror("No objects are read from the remote participant\n"); - return NULL; - } - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, oid, objcopy); - break; - default: - printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__); - return NULL; + myIndexInHostArray = findHost(myIpAddr); + if (myIndexInHostArray == -1) + { + printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME); + return -1; } - /* Close connection */ - close(sd); - return objcopy; + oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1; + oidMin = oidsPerBlock * myIndexInHostArray; + if (myIndexInHostArray == numHostsInSystem - 1) + oidMax = 0xFFFFFFFF; + else + oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1; + + return 0; } -/*This function handles the local trans requests involved in a transaction commiting process - * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT - * Activates the other nonlocal threads that are waiting for the decision and the - * based on common decision by all groups involved in the transaction it - * either commits or aborts the transaction. - * It also frees the calloced memory resources - */ +void addHost(unsigned int hostIp) +{ + unsigned int *tmpArray; -//int handleLocalReq(thread_data_array_t *tdata, trans_commit_data_t *transinfo) { -void *handleLocalReq(void *threadarg) { - int val, i = 0; - short version; - char control = 0, *ptr; - unsigned int oid; - unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL; - void *mobj, *modptr; - objheader_t *headptr; - local_thread_data_array_t *localtdata; - - localtdata = (local_thread_data_array_t *) threadarg; - printf("DEBUG->Address of localtdata = %x\n", localtdata); - - printf("DEBUG-> Machine Pile numread recv = %d\n", localtdata->tdata->buffer->f.numread); - /* Counters and arrays to formulate decision on control message to be sent */ - printf("DEBUG -> %d %d\n",localtdata->tdata->buffer->f.numread, localtdata->tdata->buffer->f.nummod); - oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int)); - int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - int objmodnotfound = 0, nummodfound = 0; - - /* modptr points to the beginning of the object store - * created at the Pariticipant */ - if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) { - printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); - return NULL; + if (findHost(hostIp) != -1) + return; + + if (numHostsInSystem == sizeOfHostArray) + { + tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int)); + memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem); + free(hostIpAddrs); + hostIpAddrs = tmpArray; } - ptr = modptr; - - /* Process each oid in the machine pile/ group per thread */ - for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { - if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified - int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array - incr *= i; - oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr)); - incr += sizeof(unsigned int); - version = *((short *)(localtdata->tdata->buffer->objread + incr)); - } else {//Objs modified - headptr = (objheader_t *) ptr; - oid = headptr->oid; - oidmod[objmod] = oid;//Array containing modified oids - objmod++; - version = headptr->version; - ptr += sizeof(objheader_t) + classsize[headptr->type]; - } + hostIpAddrs[numHostsInSystem++] = hostIp; - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ - - /* Save the oids not found and number of oids not found for later use */ - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found and number of oids not found for later use */ - - oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid; - objnotfound++; - } else { /* If Obj found in machine (i.e. has not moved) */ - /* Check if Obj is locked by any previous transaction */ - if ((((objheader_t *)mobj)->status & LOCK) == LOCK) { - if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */ - v_matchlock++; - } else {/* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - printf("DEBUG -> Sending TRANS_DISAGREE\n"); - //return tdata->recvmsg[tdata->thread_id].rcv_status; - } - } else {/* If Obj is not locked then lock object */ - ((objheader_t *)mobj)->status |= LOCK; - //TODO Remove this for Testing - randomdelay(); - - /* Save all object oids that are locked on this machine during this transaction request call */ - oidlocked[objlocked] = ((objheader_t *)mobj)->oid; - objlocked++; - if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */ - v_matchnolock++; - } else { /* If versions don't match ...HARD ABORT */ - v_nomatch++; - /* Send TRANS_DISAGREE to Coordinator */ - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE; - printf("DEBUG -> Sending TRANS_DISAGREE\n"); - // return tdata->recvmsg[tdata->thread_id].rcv_status; - } - } - } - } + return; +} - /*Decide the response to be sent to the Coordinator( the local machine in this case)*/ +int findHost(unsigned int hostIp) +{ + int i; + for (i = 0; i < numHostsInSystem; i++) + if (hostIpAddrs[i] == hostIp) + return i; - /* Condition to send TRANS_AGREE */ - if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; - printf("DEBUG -> Sending TRANS_AGREE\n"); - } - /* Condition to send TRANS_SOFT_ABORT */ - if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { - localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; - printf("DEBUG -> Sending TRANS_SOFT_ABORT\n"); - /* Send number of oids not found and the missing oids if objects are missing in the machine */ - /* TODO Remember to store the oidnotfound for later use - if(objnotfound != 0) { - int size = sizeof(unsigned int)* objnotfound; - } - */ - } + //not found + return -1; +} - /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process - * if Participant receives a TRANS_COMMIT */ - localtdata->transinfo->objmod = oidmod; - localtdata->transinfo->objlocked = oidlocked; - localtdata->transinfo->objnotfound = oidnotfound; - localtdata->transinfo->modptr = modptr; - localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod; - localtdata->transinfo->numlocked = objlocked; - localtdata->transinfo->numnotfound = objnotfound; - - /*Set flag to show that common data structure for this individual thread has been written to */ - //*(tdata->localstatus) |= LM_UPDATED; - - /* Lock and update count */ - //Thread sleeps until all messages from pariticipants are received by coordinator - pthread_mutex_lock(localtdata->tdata->lock); - (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ - - /* Wake up the threads and invoke decideResponse (once) */ - if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) { - if (decideResponse(localtdata->tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(localtdata->tdata->lock); - return NULL; - } - pthread_cond_broadcast(localtdata->tdata->threshold); - } else { - pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); - } - pthread_mutex_unlock(localtdata->tdata->lock); +/* This function sends notification request per thread waiting on object(s) whose version + * changes */ +int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) { + int sock,i; + objheader_t *objheader; + struct sockaddr_in remoteAddr; + char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)]; + char *ptr; + int bytesSent; + int status, size; + unsigned short version; + unsigned int oid,mid; + static unsigned int threadid = 0; + pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification + pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER; + notifydata_t *ndata; + + oid = oidarry[0]; + if((mid = lhashSearch(oid)) == 0) { + printf("Error: %s() No such machine found for oid =%x\n",__func__, oid); + return; + } + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("reqNotify():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + /* Generate unique threadid */ + threadid++; + + /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */ + if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) { + printf("Calloc Error %s, %d\n", __FILE__, __LINE__); + return -1; + } + ndata->numoid = numoid; + ndata->threadid = threadid; + ndata->oidarry = oidarry; + ndata->versionarry = versionarry; + ndata->threadcond = threadcond; + ndata->threadnotify = threadnotify; + if((status = notifyhashInsert(threadid, ndata)) != 0) { + printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); + free(ndata); + return -1; + } + + /* Send number of oids, oidarry, version array, machine id and threadid */ + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("reqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + free(ndata); + return -1; + } else { + msg[0] = THREAD_NOTIFY_REQUEST; + *((unsigned int *)(&msg[1])) = numoid; + /* Send array of oids */ + size = sizeof(unsigned int); + { + i = 0; + while(i < numoid) { + oid = oidarry[i]; + *((unsigned int *)(&msg[1] + size)) = oid; + size += sizeof(unsigned int); + i++; + } + } + + /* Send array of version */ + { + i = 0; + while(i < numoid) { + version = versionarry[i]; + *((unsigned short *)(&msg[1] + size)) = version; + size += sizeof(unsigned short); + i++; + } + } + + *((unsigned int *)(&msg[1] + size)) = myIpAddr; + size += sizeof(unsigned int); + *((unsigned int *)(&msg[1] + size)) = threadid; + pthread_mutex_lock(&(ndata->threadnotify)); + size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int); + send_data(sock, msg, size); + pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify)); + pthread_mutex_unlock(&(ndata->threadnotify)); + } + + pthread_cond_destroy(&threadcond); + pthread_mutex_destroy(&threadnotify); + free(ndata); + close(sock); + return status; +} - /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/ - if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ - if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod) != 0) { - printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); - return NULL; +void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { + notifydata_t *ndata; + int i, objIsFound = 0, index; + void *ptr; + + //Look up the tid and call the corresponding pthread_cond_signal + if((ndata = notifyhashSearch(tid)) == NULL) { + printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__); + return; + } else { + for(i = 0; i < ndata->numoid; i++) { + if(ndata->oidarry[i] == oid){ + objIsFound = 1; + index = i; + } } - }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ - if(transComProcess(localtdata->transinfo) != 0) { - printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); - return NULL; + if(objIsFound == 0){ + printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__); + return; + } else { + if(version <= ndata->versionarry[index]){ + printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__); + return; + } else { + /* Clear from prefetch cache and free thread related data structure */ + if((ptr = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + } + pthread_cond_signal(&(ndata->threadcond)); + } } } - - /* Free memory */ - printf("DEBUG -> Freeing...\n"); - fflush(stdout); - if (localtdata->transinfo->objmod != NULL) { - free(localtdata->transinfo->objmod); - localtdata->transinfo->objmod = NULL; - } - if (localtdata->transinfo->objlocked != NULL) { - free(localtdata->transinfo->objlocked); - localtdata->transinfo->objlocked = NULL; - } - if (localtdata->transinfo->objnotfound != NULL) { - free(localtdata->transinfo->objnotfound); - localtdata->transinfo->objnotfound = NULL; - } - - pthread_exit(NULL); + return; } -/* This function completes the ABORT process if the transaction is aborting - */ -int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) { - char *ptr; - int i; - objheader_t *tmp_header; - void *header; - - printf("DEBUG -> Recv TRANS_ABORT\n"); - /* Set all ref counts as 1 and do garbage collection */ - ptr = modptr; - for(i = 0; i< nummod; i++) { - tmp_header = (objheader_t *)ptr; - tmp_header->rcount = 1; - ptr += sizeof(objheader_t) + classsize[tmp_header->type]; - } - /* Unlock objects that was locked due to this transaction */ - for(i = 0; i< numlocked; i++) { - header = mhashSearch(objlocked[i]);// find the header address - ((objheader_t *)header)->status &= ~(LOCK); - } - /* Send ack to Coordinator */ - printf("DEBUG-> TRANS_SUCCESSFUL\n"); - - /*Free the pointer */ - ptr = NULL; - return 0; +int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { + threadlist_t *ptr; + unsigned int mid; + struct sockaddr_in remoteAddr; + char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; + int sock, status, size, bytesSent; + + while(*head != NULL) { + ptr = *head; + mid = ptr->mid; + //create a socket connection to that machine + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("notifyAll():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + //send Thread Notify response and threadid to that machine + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("notifyAll():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + fflush(stdout); + } else { + bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); + msg[0] = THREAD_NOTIFY_RESPONSE; + *((unsigned int *)&msg[1]) = oid; + size = sizeof(unsigned int); + *((unsigned short *)(&msg[1]+ size)) = version; + size+= sizeof(unsigned short); + *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; + + size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short); + send_data(sock, msg, size); + } + //close socket + close(sock); + // Update head + *head = ptr->next; + free(ptr); + } + return status; } -/*This function completes the COMMIT process is the transaction is commiting - */ - int transComProcess(trans_commit_data_t *transinfo) { - objheader_t *header; - int i = 0, offset = 0; - char control; - - printf("DEBUG -> Recv TRANS_COMMIT\n"); - /* Process each modified object saved in the mainobject store */ - for(i=0; inummod; i++) { - if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); - } - /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 1; //TODO Not sure what would be the val - - /* Change ptr address in mhash table */ - printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]); - mhashRemove(transinfo->objmod[i]); - mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset)); - offset += sizeof(objheader_t) + classsize[header->type]; - - /* Update object version number */ - header = (objheader_t *) mhashSearch(transinfo->objmod[i]); - header->version += 1; - } - - /* Unlock locked objects */ - for(i=0; inumlocked; i++) { - header = (objheader_t *) mhashSearch(transinfo->objlocked[i]); - header->status &= ~(LOCK); - } - - //TODO Update location lookup table - - /* Send ack to Coordinator */ - printf("DEBUG-> TRANS_SUCESSFUL\n"); - return 0; - } - -/*This function makes piles to prefetch records and prefetches the oids from remote machines */ -int transPrefetch(transrecord_t *record, trans_prefetchtuple_t *prefetchtuple){ - /* Create Pile*/ - /* For each Pile in the machine send TRANS_PREFETCH */ +void transAbort(transrecord_t *trans) { + objstrDelete(trans->cache); + chashDelete(trans->lookupTable); + free(trans); }