#include "dstm.h"
#include "ip.h"
-#include "clookup.h"
#include "machinepile.h"
#include "mlookup.h"
#include "llookup.h"
#include "threadnotify.h"
#include "queue.h"
#include "addUdpEnhance.h"
+#include "addPrefetchEnhance.h"
#include "gCollect.h"
+#include "dsmlock.h"
+#include "prefetch.h"
#ifdef COMPILER
#include "thread.h"
#endif
#define PREFETCH_CACHE_SIZE 1048576 //1MB
#define CONFIG_FILENAME "dstm.conf"
+
/* Global Variables */
extern int classsize[];
-
+pfcstats_t *evalPrefetch;
+extern int numprefetchsites; //Global variable containing number of prefetch sites
+extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
objstr_t *prefetchcache; //Global Prefetch cache
-pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
+pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
-extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
-pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */
+pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */
extern objstr_t *mainobjstore;
unsigned int myIpAddr;
unsigned int *hostIpAddrs;
sockPoolHashTable_t *transReadSockPool;
sockPoolHashTable_t *transPrefetchSockPool;
+sockPoolHashTable_t *transRequestSockPool;
pthread_mutex_t notifymutex;
pthread_mutex_t atomicObjLock;
**********************************/
int numTransCommit = 0;
int numTransAbort = 0;
+int nchashSearch = 0;
+int nmhashSearch = 0;
+int nprehashSearch = 0;
+int nRemoteSend = 0;
+int nSoftAbort = 0;
+int bytesSent = 0;
+int bytesRecv = 0;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
+plistnode_t *sortPiles(plistnode_t *pileptr);
/*******************************
- * Send and Recv function calls
- *******************************/
-void send_data(int fd , void *buf, int buflen) {
- char *buffer = (char *)(buf);
+* Send and Recv function calls
+*******************************/
+void send_data(int fd, void *buf, int buflen) {
+ char *buffer = (char *)(buf);
int size = buflen;
- int numbytes;
+ int numbytes;
while (size > 0) {
numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
+ bytesSent = bytesSent + numbytes;
if (numbytes == -1) {
perror("send");
- exit(-1);
+ exit(0);
}
buffer += numbytes;
size -= numbytes;
}
}
-void recv_data(int fd , void *buf, int buflen) {
- char *buffer = (char *)(buf);
+void recv_data(int fd, void *buf, int buflen) {
+ char *buffer = (char *)(buf);
int size = buflen;
- int numbytes;
+ int numbytes;
while (size > 0) {
numbytes = recv(fd, buffer, size, 0);
+ bytesRecv = bytesRecv + numbytes;
if (numbytes == -1) {
perror("recv");
- exit(-1);
+ exit(0);
}
buffer += numbytes;
size -= numbytes;
}
}
-int recv_data_errorcode(int fd , void *buf, int buflen) {
- char *buffer = (char *)(buf);
+int recv_data_errorcode(int fd, void *buf, int buflen) {
+ char *buffer = (char *)(buf);
int size = buflen;
- int numbytes;
+ int numbytes;
while (size > 0) {
numbytes = recv(fd, buffer, size, 0);
if (numbytes==0)
inline int arrayLength(int *array) {
int i;
- for(i=0 ;array[i] != -1; i++)
+ for(i=0 ; array[i] != -1; i++)
;
return i;
}
inline int findmax(int *array, int arraylength) {
int max, i;
max = array[0];
- for(i = 0; i < arraylength; i++){
+ for(i = 0; i < arraylength; i++) {
if(array[i] > max) {
max = array[i];
}
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
-void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
+void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
/* Allocate for the queue node*/
- int qnodesize = sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
- char * node= getmemory(qnodesize);
- /* Set queue node values */
+ int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
int len;
+ char * node= getmemory(qnodesize);
int top=endoffsets[ntuples-1];
- *((int *)(node))=ntuples;
- len = sizeof(int);
+ if (node==NULL)
+ return;
+ /* Set queue node values */
+
+ /* TODO: Remove this after testing */
+ evalPrefetch[siteid].callcount++;
+
+ *((int *)(node))=siteid;
+ *((int *)(node + sizeof(int))) = ntuples;
+ len = 2*sizeof(int);
memcpy(node+len, oids, ntuples*sizeof(unsigned int));
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
printf("Trans stats is on\n");
fflush(stdout);
#endif
-
+
//Initialize socket pool
- transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
- transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1);
-
+ transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
+ transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
+ transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
+
dstmInit();
transInit();
-
+
fd=startlistening();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+#ifdef CACHE
udpfd = udpInit();
pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
+#endif
if (master) {
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
return 1;
} else {
objstr_t *ptr;
ptr = store;
int success = 0;
-
+
while(ptr->next != NULL) {
/* check if store is empty */
if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
success = 1;
return tmp;
} else {
- ptr = ptr-> next;
+ ptr = ptr->next;
}
}
-
+
if(success == 0) {
return NULL;
}
* prefetch requests */
void transInit() {
- int t, rc;
- int retval;
//Create and initialize prefetch cache structure
+#ifdef CACHE
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
initializePCache();
-
+ if((evalPrefetch = initPrefetchStats()) == NULL) {
+ printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
+ exit(0);
+ }
+#endif
+
/* Initialize attributes for mutex */
pthread_mutexattr_init(&prefetchcache_mutex_attr);
pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
-
+
pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
-
pthread_mutex_init(¬ifymutex, NULL);
pthread_mutex_init(&atomicObjLock, NULL);
+#ifdef CACHE
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
printf("ERROR\n");
return; //Failure
}
-
+
//Initialize primary shared queue
queueInit();
//Initialize machine pile w/prefetch oids and offsets shared queue
mcpileqInit();
-
- //Create the primary prefetch thread
+
+ //Create the primary prefetch thread
+ int retval;
+#ifdef RANGEPREFETCH
+ do {
+ retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
+ } while(retval!=0);
+#else
do {
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
+#endif
pthread_detach(tPrefetch);
+#endif
}
/* This function stops the threads spawned */
void transExit() {
+#ifdef CACHE
int t;
pthread_cancel(tPrefetch);
for(t = 0; t < NUM_THREADS; t++)
pthread_cancel(wthreads[t]);
-
+#endif
+
return;
}
void randomdelay() {
struct timespec req;
time_t t;
-
+
t = time(NULL);
req.tv_sec = 0;
- req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+ req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
nanosleep(&req, NULL);
return;
}
/* This function initializes things required in the transaction start*/
-transrecord_t *transStart() {
+__attribute__((malloc)) transrecord_t *transStart() {
transrecord_t *tmp;
- if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){
+ if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
return NULL;
}
tmp->cache = objstrCreate(1048576);
- tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
-#ifdef COMPILER
- tmp->revertlist=NULL;
-#endif
+ tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
+ //#ifdef COMPILER
+ // tmp->revertlist=NULL; //Not necessary...already null
+ //#endif
return tmp;
}
+// Search for an address for a given oid
+/*#define INLINE inline __attribute__((always_inline))
+
+INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
+ //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE
+ chashlistnode_t *node = &table->table[(key & table->mask)>>1];
+
+ do {
+ if(node->key == key) {
+ return node->val;
+ }
+ node = node->next;
+ } while(node != NULL);
+
+ return NULL;
+ }*/
+
+
/* This function finds the location of the objects involved in a transaction
* and returns the pointer to the object if found in a remote location */
-objheader_t *transRead(transrecord_t *record, unsigned int oid) {
+__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) {
unsigned int machinenumber;
objheader_t *tmp, *objheader;
objheader_t *objcopy;
- int size, found = 0;
+ int size;
void *buf;
-
+ chashlistnode_t *node;
+ chashtable_t *table=record->lookupTable;
+
if(oid == 0) {
return NULL;
}
- if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
- /* Search local transaction cache */
+ node= &table->table[(oid & table->mask)>>1];
+ do {
+ if(node->key == oid) {
+#ifdef TRANSSTATS
+ nchashSearch++;
+#endif
+#ifdef COMPILER
+ return &((objheader_t*)node->val)[1];
+#else
+ return node->val;
+#endif
+ }
+ node = node->next;
+ } while(node != NULL);
+
+
+ /*
+ if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
+#ifdef TRANSSTATS
+ nchashSearch++;
+#endif
#ifdef COMPILER
return &objheader[1];
#else
return objheader;
#endif
- } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+ } else
+ */
+
+#ifdef ABORTREADERS
+ if (trans->abort) {
+ //abort this transaction
+ longjmp(trans->aborttrans,1);
+ } else
+ addtransaction(oid,record);
+#endif
+
+ if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+ nmhashSearch++;
+#endif
/* Look up in machine lookup table and copy into cache*/
GETSIZE(size, objheader);
size += sizeof(objheader_t);
memcpy(objcopy, objheader, size);
/* Insert into cache's lookup table */
STATUS(objcopy)=0;
- chashInsert(record->lookupTable, OID(objheader), objcopy);
+ chashInsert(record->lookupTable, OID(objheader), objcopy);
#ifdef COMPILER
return &objcopy[1];
#else
return objcopy;
#endif
- } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
- /* Look up in prefetch cache */
- GETSIZE(size, tmp);
- size+=sizeof(objheader_t);
- objcopy = (objheader_t *) objstrAlloc(record->cache, size);
- memcpy(objcopy, tmp, size);
- /* Insert into cache's lookup table */
- chashInsert(record->lookupTable, OID(tmp), objcopy);
+ } else {
+#ifdef CACHE
+ if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+ nprehashSearch++;
+#endif
+ /* Look up in prefetch cache */
+ GETSIZE(size, tmp);
+ size+=sizeof(objheader_t);
+ objcopy = (objheader_t *) objstrAlloc(record->cache, size);
+ memcpy(objcopy, tmp, size);
+ /* Insert into cache's lookup table */
+ chashInsert(record->lookupTable, OID(tmp), objcopy);
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
+#endif
+ }
#endif
- } else {
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
return NULL;
}
objcopy = getRemoteObj(record, machinenumber, oid);
-
+
if(objcopy == NULL) {
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
- STATUS(objcopy)=0;
+#ifdef TRANSSTATS
+ nRemoteSend++;
+#endif
+ STATUS(objcopy)=0;
#ifdef COMPILER
return &objcopy[1];
#else
tmp->rcount = 1;
STATUS(tmp) = NEW;
chashInsert(record->lookupTable, OID(tmp), tmp);
-
+
#ifdef COMPILER
return &tmp[1]; //want space after object header
#else
#endif
}
+#if 1
/* This function creates machine piles based on all machines involved in a
* transaction commit request */
plistnode_t *createPiles(transrecord_t *record) {
chashlistnode_t * ptr = record->lookupTable->table;
/* Represents number of bins in the chash table */
unsigned int size = record->lookupTable->size;
-
+
for(i = 0; i < size ; i++) {
chashlistnode_t * curr = &ptr[i];
/* Inner loop to traverse the linked list of the cache lookupTable */
//if the first bin in hash table is empty
if(curr->key == 0)
break;
-
- if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
- printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
-
+ headeraddr=(objheader_t *) curr->val;
+
//Get machine location for object id (and whether local or not)
if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
machinenum = myIpAddr;
printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
return NULL;
}
-
+
//Make machine groups
pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
curr = curr->next;
}
}
- return pile;
+ return pile;
}
+#else
+/* This function creates machine piles based on all machines involved in a
+ * transaction commit request */
+plistnode_t *createPiles(transrecord_t *record) {
+ int i;
+ plistnode_t *pile = NULL;
+ unsigned int machinenum;
+ objheader_t *headeraddr;
+ struct chashentry * ptr = record->lookupTable->table;
+ /* Represents number of bins in the chash table */
+ unsigned int size = record->lookupTable->size;
+
+ for(i = 0; i < size ; i++) {
+ struct chashentry * curr = & ptr[i];
+ /* Inner loop to traverse the linked list of the cache lookupTable */
+ //if the first bin in hash table is empty
+ if(curr->key == 0)
+ continue;
+ headeraddr=(objheader_t *) curr->ptr;
+
+ //Get machine location for object id (and whether local or not)
+ if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
+ machinenum = myIpAddr;
+ } else if ((machinenum = lhashSearch(curr->key)) == 0) {
+ printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
+ //Make machine groups
+ pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
+ }
+ return pile;
+}
+#endif
/* This function initiates the transaction commit process
- * Spawns threads for each of the new connections with Participants
- * and creates new piles by calling the createPiles(),
- * Sends a transrequest() to each remote machines for objects found remotely
+ * Spawns threads for each of the new connections with Participants
+ * and creates new piles by calling the createPiles(),
+ * Sends a transrequest() to each remote machines for objects found remotely
* and calls handleLocalReq() to process objects found locally */
-int transCommit(transrecord_t *record) {
+int transCommit(transrecord_t *record) {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
- int i, j, rc, val;
- int pilecount, offset, threadnum, trecvcount;
- char control;
- char transid[TID_LEN];
- trans_req_data_t *tosend;
- trans_commit_data_t transinfo;
- static int newtid = 0;
- char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
- thread_data_array_t *thread_data_array;
- local_thread_data_array_t *ltdata;
+ int trecvcount;
+ char treplyretry; /* keeps track of the common response that needs to be sent */
int firsttime=1;
+ trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
+ char finalResponse;
- do {
- treplyctrl=0;
- trecvcount = 0;
- threadnum = 0;
+ do {
+ trecvcount = 0;
treplyretry = 0;
- thread_data_array = NULL;
- ltdata = NULL;
-
- /* Look through all the objects in the transaction record and make piles
+
+ /* Look through all the objects in the transaction record and make piles
* for each machine involved in the transaction*/
- if (firsttime)
+ if (firsttime) {
pile_ptr = pile = createPiles(record);
- else
- pile=pile_ptr;
- firsttime=0;
-
+ pile_ptr = pile = sortPiles(pile);
+ } else {
+ pile = pile_ptr;
+ }
+ firsttime = 0;
/* Create the packet to be sent in TRANS_REQUEST */
-
+
/* Count the number of participants */
+ int pilecount;
pilecount = pCount(pile);
-
- /* Create a list of machine ids(Participants) involved in transaction */
+
+ /* Create a list of machine ids(Participants) involved in transaction */
listmid = calloc(pilecount, sizeof(unsigned int));
pListMid(pile, listmid);
-
-
- /* Initialize thread variables,
- * Spawn a thread for each Participant involved in a transaction */
- pthread_t thread[pilecount];
- pthread_attr_t attr;
- pthread_cond_t tcond;
- pthread_mutex_t tlock;
- pthread_mutex_t tlshrd;
-
- thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
-
- ltdata = calloc(1, sizeof(local_thread_data_array_t));
-
- thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
-
- /* Initialize and set thread detach attribute */
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_mutex_init(&tlock, NULL);
- pthread_cond_init(&tcond, NULL);
+
+ /* Create a socket and getReplyCtrl array, initialize */
+ int socklist[pilecount];
+ int loopcount;
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ socklist[loopcount] = 0;
+ char getReplyCtrl[pilecount];
+ for(loopcount = 0 ; loopcount < pilecount; loopcount++)
+ getReplyCtrl[loopcount] = 0;
/* Process each machine pile */
+ int sockindex = 0;
+ trans_req_data_t *tosend;
+ tosend = calloc(pilecount, sizeof(trans_req_data_t));
while(pile != NULL) {
- //Create transaction id
- newtid++;
- tosend = calloc(1, sizeof(trans_req_data_t));
- tosend->f.control = TRANS_REQUEST;
- sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
- tosend->f.mcount = pilecount;
- tosend->f.numread = pile->numread;
- tosend->f.nummod = pile->nummod;
- tosend->f.numcreated = pile->numcreated;
- tosend->f.sum_bytes = pile->sum_bytes;
- tosend->listmid = listmid;
- tosend->objread = pile->objread;
- tosend->oidmod = pile->oidmod;
- tosend->oidcreated = pile->oidcreated;
- thread_data_array[threadnum].thread_id = threadnum;
- thread_data_array[threadnum].mid = pile->mid;
- thread_data_array[threadnum].buffer = tosend;
- thread_data_array[threadnum].recvmsg = rcvd_control_msg;
- thread_data_array[threadnum].threshold = &tcond;
- thread_data_array[threadnum].lock = &tlock;
- thread_data_array[threadnum].count = &trecvcount;
- thread_data_array[threadnum].replyctrl = &treplyctrl;
- thread_data_array[threadnum].replyretry = &treplyretry;
- thread_data_array[threadnum].rec = record;
- /* If local do not create any extra connection */
- if(pile->mid != myIpAddr) { /* Not local */
- do {
- rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
- } while(rc!=0);
- if(rc) {
- perror("Error in pthread create\n");
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
+ tosend[sockindex].f.control = TRANS_REQUEST;
+ tosend[sockindex].f.mcount = pilecount;
+ tosend[sockindex].f.numread = pile->numread;
+ tosend[sockindex].f.nummod = pile->nummod;
+ tosend[sockindex].f.numcreated = pile->numcreated;
+ tosend[sockindex].f.sum_bytes = pile->sum_bytes;
+ tosend[sockindex].listmid = listmid;
+ tosend[sockindex].objread = pile->objread;
+ tosend[sockindex].oidmod = pile->oidmod;
+ tosend[sockindex].oidcreated = pile->oidcreated;
+ int sd = 0;
+ if(pile->mid != myIpAddr) {
+ if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
+ printf("transRequest(): socket create error\n");
free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
+ free(tosend);
return 1;
}
- } else { /*Local*/
- ltdata->tdata = &thread_data_array[threadnum];
- ltdata->transinfo = &transinfo;
- do {
- val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
- } while(val!=0);
- if(val) {
- perror("Error in pthread create\n");
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
+ socklist[sockindex] = sd;
+ /* Send bytes of data with TRANS_REQUEST control message */
+ send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+
+ /* Send list of machines involved in the transaction */
+ {
+ int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
+ send_data(sd, tosend[sockindex].listmid, size);
+ }
+
+ /* Send oids and version number tuples for objects that are read */
+ {
+ int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
+ send_data(sd, tosend[sockindex].objread, size);
+ }
+
+ /* Send objects that are modified */
+ void *modptr;
+ if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
+ printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
+ free(tosend);
return 1;
}
+ int offset = 0;
+ int i;
+ for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
+ int size;
+ objheader_t *headeraddr;
+ if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) {
+ printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
+ free(modptr);
+ free(listmid);
+ free(tosend);
+ return 1;
+ }
+ GETSIZE(size,headeraddr);
+ size+=sizeof(objheader_t);
+ memcpy(modptr+offset, headeraddr, size);
+ offset+=size;
+ }
+ send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+ free(modptr);
+ } else { //handle request locally
+ handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]);
}
-
- threadnum++;
+ sockindex++;
pile = pile->next;
+ } //end of pile processing
+ /* Recv Ctrl msgs from all machines */
+ int i;
+ for(i = 0; i < pilecount; i++) {
+ int sd = socklist[i];
+ if(sd != 0) {
+ char control;
+ recv_data(sd, &control, sizeof(char));
+ //Update common data structure with new ctrl msg
+ getReplyCtrl[i] = control;
+ /* Recv Objects if participant sends TRANS_DISAGREE */
+#ifdef CACHE
+ if(control == TRANS_DISAGREE) {
+ int length;
+ recv_data(sd, &length, sizeof(int));
+ void *newAddr;
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, newAddr, length);
+ int offset = 0;
+ while(length != 0) {
+ unsigned int oidToPrefetch;
+ objheader_t * header;
+ header = (objheader_t *)(((char *)newAddr) + offset);
+ oidToPrefetch = OID(header);
+ int size = 0;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ //make an entry in prefetch hash table
+ void *oldptr;
+ if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+ prehashRemove(oidToPrefetch);
+ prehashInsert(oidToPrefetch, header);
+ } else {
+ prehashInsert(oidToPrefetch, header);
+ }
+ length = length - size;
+ offset += size;
+ }
+ } //end of receiving objs
+#endif
+ }
}
- /* Free attribute and wait for the other threads */
- pthread_attr_destroy(&attr);
-
- for (i = 0; i < threadnum; i++) {
- rc = pthread_join(thread[i], NULL);
- if(rc)
- {
- printf("Error: return code from pthread_join() is %d\n", rc);
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- for (j = i; j < threadnum; j++) {
- free(thread_data_array[j].buffer);
+ /* Decide the final response */
+ if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+
+ /* Send responses to all machines */
+ for(i = 0; i < pilecount; i++) {
+ int sd = socklist[i];
+ if(sd != 0) {
+#ifdef CACHE
+ if(finalResponse == TRANS_COMMIT) {
+ int retval;
+ /* Update prefetch cache */
+ if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
}
- return 1;
+
+
+ /* Invalidate objects in other machine cache */
+ if(tosend[i].f.nummod > 0) {
+ if((retval = invalidateObj(&(tosend[i]))) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ free(tosend);
+ free(listmid);
+ return 1;
+ }
+ }
+#ifdef ABORTREADERS
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread, record);
+#endif
+ }
+#ifdef ABORTREADERS
+ else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread,record);
}
- free(thread_data_array[i].buffer);
+#endif
+#endif
+ send_data(sd, &finalResponse, sizeof(char));
+ } else {
+ /* Complete local processing */
+ doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
+#ifdef ABORTREADERS
+ if(finalResponse == TRANS_COMMIT) {
+ removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread, record);
+ } else if (!treplyretry) {
+ removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
+ removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread,record);
+ }
+#endif
+ }
}
- /* Free resources */
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
+ /* Free resources */
+ free(tosend);
free(listmid);
-
if (!treplyretry)
pDelete(pile_ptr);
-
/* wait a random amount of time before retrying to commit transaction*/
if(treplyretry) {
- free(thread_data_array);
- free(ltdata);
randomdelay();
+#ifdef TRANSSTATS
+ nSoftAbort++;
+#endif
}
-
/* Retry trans commit procedure during soft_abort case */
} while (treplyretry);
-
- if(treplyctrl == TRANS_ABORT) {
+
+ if(finalResponse == TRANS_ABORT) {
+ //printf("Aborting trans\n");
#ifdef TRANSSTATS
numTransAbort++;
#endif
objstrDelete(record->cache);
chashDelete(record->lookupTable);
free(record);
- free(thread_data_array);
- free(ltdata);
return TRANS_ABORT;
- } else if(treplyctrl == TRANS_COMMIT) {
+ } else if(finalResponse == TRANS_COMMIT) {
#ifdef TRANSSTATS
numTransCommit++;
#endif
objstrDelete(record->cache);
chashDelete(record->lookupTable);
free(record);
- free(thread_data_array);
- free(ltdata);
return 0;
} else {
//TODO Add other cases
return 0;
}
-/* This function sends information involved in the transaction request
- * to participants and accepts a response from particpants.
- * It calls decideresponse() to decide on what control message
- * to send next to participants and sends the message using sendResponse()*/
-void *transRequest(void *threadarg) {
- int sd, i, n;
- struct sockaddr_in serv_addr;
- thread_data_array_t *tdata;
- objheader_t *headeraddr;
- char control, recvcontrol;
- char machineip[16], retval;
-
- tdata = (thread_data_array_t *) threadarg;
-
- /* Send Trans Request */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket for TRANS_REQUEST\n");
- pthread_exit(NULL);
- }
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- serv_addr.sin_addr.s_addr = htonl(tdata->mid);
-
- /* Open Connection */
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- perror("transRequest() connect");
- close(sd);
- pthread_exit(NULL);
- }
-
- /* Send bytes of data with TRANS_REQUEST control message */
- send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
-
- /* Send list of machines involved in the transaction */
- {
- int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
- send_data(sd, tdata->buffer->listmid, size);
+/* This function handles the local objects involved in a transaction
+ * commiting process. It also makes a decision if this local machine
+ * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
+void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) {
+ unsigned int *oidnotfound = NULL, *oidlocked = NULL;
+ int numoidnotfound = 0, numoidlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+ int numread, i;
+ unsigned int oid;
+ unsigned short version;
+
+ /* Counters and arrays to formulate decision on control message to be sent */
+ oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
+ oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
+ //setting a divider between read and write locks
+ numread = tdata->f.numread;
+ /* Process each oid in the machine pile/ group per thread */
+ for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
+ if (i < tdata->f.numread) {
+ int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(((char *)tdata->objread) + incr));
+ version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
+ commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
+ } else { // Objects Modified
+ if(i == tdata->f.numread) {
+ oidlocked[numoidlocked++] = -1;
+ }
+ int tmpsize;
+ objheader_t *headptr;
+ headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]);
+ if (headptr == NULL) {
+ printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ oid = OID(headptr);
+ version = headptr->version;
+ commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
+ }
}
-
- /* Send oids and version number tuples for objects that are read */
- {
- int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
- send_data(sd, tdata->buffer->objread, size);
+
+/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+ * if Participant receives a TRANS_COMMIT */
+ transinfo->objlocked = oidlocked;
+ transinfo->objnotfound = oidnotfound;
+ transinfo->modptr = NULL;
+ transinfo->numlocked = numoidlocked;
+ transinfo->numnotfound = numoidnotfound;
+
+ /* Condition to send TRANS_AGREE */
+ if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
+ *getReplyCtrl = TRANS_AGREE;
}
-
- /* Send objects that are modified */
- for(i = 0; i < tdata->buffer->f.nummod ; i++) {
- int size;
- headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
- GETSIZE(size,headeraddr);
- size+=sizeof(objheader_t);
- send_data(sd, headeraddr, size);
+ /* Condition to send TRANS_SOFT_ABORT */
+ if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+ *getReplyCtrl = TRANS_SOFT_ABORT;
}
-
- /* Read control message from Participant */
- recv_data(sd, &control, sizeof(char));
- /* Recv Objects if participant sends TRANS_DISAGREE */
- if(control == TRANS_DISAGREE) {
- int length;
- recv_data(sd, &length, sizeof(int));
- void *newAddr;
- pthread_mutex_lock(&prefetchcache_mutex);
- if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
- printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- close(sd);
- pthread_exit(NULL);
+}
+
+void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) {
+ if(finalResponse == TRANS_ABORT) {
+ if(transAbortProcess(transinfo) != 0) {
+ printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
+ return;
}
- pthread_mutex_unlock(&prefetchcache_mutex);
- recv_data(sd, newAddr, length);
- int offset = 0;
- while(length != 0) {
- unsigned int oidToPrefetch;
- objheader_t * header;
- header = (objheader_t *) (((char *)newAddr) + offset);
- oidToPrefetch = OID(header);
- int size = 0;
- GETSIZE(size, header);
- size += sizeof(objheader_t);
- //make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
- prehashRemove(oidToPrefetch);
- prehashInsert(oidToPrefetch, header);
- } else {
- prehashInsert(oidToPrefetch, header);
+ } else if(finalResponse == TRANS_COMMIT) {
+#ifdef CACHE
+ /* Invalidate objects in other machine cache */
+ if(tdata->f.nummod > 0) {
+ int retval;
+ if((retval = invalidateObj(tdata)) != 0) {
+ printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
}
- length = length - size;
- offset += size;
}
- }
- recvcontrol = control;
- /* Update common data structure and increment count */
- tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
-
- /* Lock and update count */
- /* Thread sleeps until all messages from pariticipants are received by coordinator */
- pthread_mutex_lock(tdata->lock);
-
- (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
- /* Wake up the threads and invoke decideResponse (once) */
- if(*(tdata->count) == tdata->buffer->f.mcount) {
- decideResponse(tdata);
- pthread_cond_broadcast(tdata->threshold);
+#endif
+ if(transComProcess(tdata, transinfo, record) != 0) {
+ printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+ fflush(stdout);
+ return;
+ }
} else {
- pthread_cond_wait(tdata->threshold, tdata->lock);
+ printf("ERROR...No Decision\n");
}
- pthread_mutex_unlock(tdata->lock);
-
- /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
- * to all participants in their respective socket */
- if (sendResponse(tdata, sd) == 0) {
- printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
- close(sd);
- pthread_exit(NULL);
+
+ /* Free memory */
+ if (transinfo->objlocked != NULL) {
+ free(transinfo->objlocked);
}
-
- recv_data((int)sd, &control, sizeof(char));
-
- if(control == TRANS_UNSUCESSFUL) {
- //printf("DEBUG-> TRANS_ABORTED\n");
- } else if(control == TRANS_SUCESSFUL) {
- //printf("DEBUG-> TRANS_SUCCESSFUL\n");
- } else {
- //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
+ if (transinfo->objnotfound != NULL) {
+ free(transinfo->objnotfound);
}
-
- /* Close connection */
- close(sd);
- pthread_exit(NULL);
}
-/* This function decides the reponse that needs to be sent to
+/* This function decides the reponse that needs to be sent to
* all Participant machines after the TRANS_REQUEST protocol */
-void decideResponse(thread_data_array_t *tdata) {
- char control;
+char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) {
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
-
- for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
- control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
- written onto the shared array */
+ for (i = 0 ; i < pilecount; i++) {
+ char control;
+ control = getReplyCtrl[i];
switch(control) {
default:
printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+
/* treat as disagree, pass thru */
case TRANS_DISAGREE:
transdisagree++;
break;
-
+
case TRANS_AGREE:
transagree++;
break;
-
+
case TRANS_SOFT_ABORT:
transsoftabort++;
break;
}
}
-
+
if(transdisagree > 0) {
/* Send Abort */
- *(tdata->replyctrl) = TRANS_ABORT;
- *(tdata->replyretry) = 0;
+ *treplyretry = 0;
+ return TRANS_ABORT;
+#ifdef CACHE
/* clear objects from prefetch cache */
- for (i = 0; i < tdata->buffer->f.numread; i++) {
- prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
- }
- for (i = 0; i < tdata->buffer->f.nummod; i++) {
- prehashRemove(tdata->buffer->oidmod[i]);
- }
- } else if(transagree == tdata->buffer->f.mcount){
+ cleanPCache(record);
+#endif
+ } else if(transagree == pilecount) {
/* Send Commit */
- *(tdata->replyctrl) = TRANS_COMMIT;
- *(tdata->replyretry) = 0;
- /* update prefetch cache */
- /* For objects read */
- char oidType;
- int retval;
- oidType = 'R';
- if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- oidType = 'M';
- if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
- printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- /* Invalidate objects in other machine cache */
- if(tdata->buffer->f.nummod > 0) {
- if((retval = invalidateObj(tdata)) != 0) {
- printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
- }
- } else {
- /* Send Abort in soft abort case followed by retry commiting transaction again*/
- *(tdata->replyctrl) = TRANS_ABORT;
- *(tdata->replyretry) = 1;
- }
- return;
-}
-
-/* This function updates the prefetch cache when commiting objects
- * based on the type of oid i.e. if oid is read or oid is modified
- * Return -1 on error else returns 0
- */
-int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
- int i;
- for (i = 0; i < numoid; i++) {
- //find address object
- objheader_t *header, *newAddr;
- int size;
- unsigned int oid;
- if(oidType == 'R') {
- oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
- } else {
- oid = tdata->buffer->oidmod[i];
- }
- pthread_mutex_lock(&prefetchcache_mutex);
- header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
- //copy object into prefetch cache
- GETSIZE(size, header);
- if ((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
- printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- return -1;
- }
- pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(newAddr, header, (size + sizeof(objheader_t)));
- //make an entry in prefetch hash table
- void *oldptr;
- if((oldptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- prehashInsert(oid, newAddr);
- } else {
- prehashInsert(oid, newAddr);
- }
+ *treplyretry = 0;
+ return TRANS_COMMIT;
+ } else {
+ /* Send Abort in soft abort case followed by retry commiting transaction again*/
+ *treplyretry = 1;
+ return TRANS_ABORT;
}
return 0;
}
-
-/* This function sends the final response to remote machines per
- * thread in their respective socket id It returns a char that is only
- * needed to check the correctness of execution of this function
- * inside transRequest()*/
-
-char sendResponse(thread_data_array_t *tdata, int sd) {
- int n, size, sum, oidcount = 0, control;
- char *ptr, retval = 0;
- unsigned int *oidnotfound;
-
- control = *(tdata->replyctrl);
- send_data(sd, &control, sizeof(char));
-
- //TODO read missing objects during object migration
- /* If response is a soft abort due to missing objects at the
- Participant's side */
-
- /* If the decided response is TRANS_ABORT */
- if(*(tdata->replyctrl) == TRANS_ABORT) {
- retval = TRANS_ABORT;
- } else if(*(tdata->replyctrl) == TRANS_COMMIT) {
- /* If the decided response is TRANS_COMMIT */
- retval = TRANS_COMMIT;
- }
-
- return retval;
-}
-
/* This function opens a connection, places an object read request to
* the remote machine, reads the control message and object if
* available and copies the object and its header to the local
- * cache. */
+ * cache. */
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
int size, val;
char control;
objheader_t *h;
void *objcopy = NULL;
-
+
int sd = getSock2(transReadSockPool, mnum);
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
send_data(sd, readrequest, sizeof(readrequest));
-
+
/* Read response from the Participant */
recv_data(sd, &control, sizeof(char));
-
+
if (control==OBJECT_NOT_FOUND) {
objcopy = NULL;
} else {
objcopy = objstrAlloc(record->cache, size);
recv_data(sd, objcopy, size);
/* Insert into cache's lookup table */
- chashInsert(record->lookupTable, oid, objcopy);
+ chashInsert(record->lookupTable, oid, objcopy);
}
-
+
return objcopy;
}
-/* This function handles the local objects involved in a transaction
- * commiting process. It also makes a decision if this local machine
- * sends AGREE or DISAGREE or SOFT_ABORT to coordinator. Note
- * Coordinator = local machine It wakes up the other threads from
- * remote participants that are waiting for the coordinator's decision
- * and based on common agreement it either commits or aborts the
- * transaction. It also frees the memory resources */
-
-void *handleLocalReq(void *threadarg) {
- unsigned int *oidnotfound = NULL, *oidlocked = NULL;
- local_thread_data_array_t *localtdata;
- int numoidnotfound = 0, numoidlocked = 0;
- int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
- int numread, i;
- unsigned int oid;
- unsigned short version;
+/* Commit info for objects modified */
+void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+ int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
void *mobj;
- objheader_t *headptr;
-
- localtdata = (local_thread_data_array_t *) threadarg;
-
- /* Counters and arrays to formulate decision on control message to be sent */
- oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-
- numread = localtdata->tdata->buffer->f.numread;
- /* Process each oid in the machine pile/ group per thread */
- for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
- if (i < localtdata->tdata->buffer->f.numread) {
- int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
- incr *= i;
- oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
- version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
- } else { // Objects Modified
- int tmpsize;
- headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
- if (headptr == NULL) {
- printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
- return NULL;
- }
- oid = OID(headptr);
- version = headptr->version;
- }
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found and number of oids not found for later use */
- if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[numoidnotfound] = oid;
- numoidnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if (test_and_set(STATUSPTR(mobj))) {
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else {/* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- }
- } else {
- //we're locked
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
- numoidlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- }
+ oidnotfound[*numoidnotfound] = oid;
+ (*numoidnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+ if (version == ((objheader_t *)mobj)->version) { /* match versions */
+ (*v_matchnolock)++;
+ //Keep track of what is locked
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ *getReplyCtrl = TRANS_DISAGREE;
+
+ //Keep track of what is locked
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+ return;
+ }
+ } else { //A lock is acquired some place else
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ *getReplyCtrl = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+ return;
}
}
- } // End for
- /* Condition to send TRANS_AGREE */
- if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
}
- /* Condition to send TRANS_SOFT_ABORT */
- if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
- }
-
- /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
- * if Participant receives a TRANS_COMMIT */
- localtdata->transinfo->objlocked = oidlocked;
- localtdata->transinfo->objnotfound = oidnotfound;
- localtdata->transinfo->modptr = NULL;
- localtdata->transinfo->numlocked = numoidlocked;
- localtdata->transinfo->numnotfound = numoidnotfound;
- /* Lock and update count */
- //Thread sleeps until all messages from pariticipants are received by coordinator
- pthread_mutex_lock(localtdata->tdata->lock);
- (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
-
- /* Wake up the threads and invoke decideResponse (once) */
- if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
- decideResponse(localtdata->tdata);
- pthread_cond_broadcast(localtdata->tdata->threshold);
- } else {
- pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
- }
- pthread_mutex_unlock(localtdata->tdata->lock);
- if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
- if(transAbortProcess(localtdata) != 0) {
- printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
- }
- } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
- if(transComProcess(localtdata) != 0) {
- printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
+}
+
+/* Commit info for objects modified */
+void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+ int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+ void *mobj;
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+ /* Save the oids not found and number of oids not found for later use */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[*numoidnotfound] = oid;
+ (*numoidnotfound)++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ (*v_matchnolock)++;
+ //Keep track of what is locked
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ *getReplyCtrl = TRANS_DISAGREE;
+ //Keep track of what is locked
+ oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+ return;
+ }
+ } else { //Has reached max number of readers or some other transaction
+ //has acquired a lock on this object
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ (*v_matchlock)++;
+ } else { /* If versions don't match ...HARD ABORT */
+ (*v_nomatch)++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ *getReplyCtrl = TRANS_DISAGREE;
+ //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+ return;
+ }
}
}
- /* Free memory */
- if (localtdata->transinfo->objlocked != NULL) {
- free(localtdata->transinfo->objlocked);
- }
- if (localtdata->transinfo->objnotfound != NULL) {
- free(localtdata->transinfo->objnotfound);
- }
-
- pthread_exit(NULL);
}
/* This function completes the ABORT process if the transaction is aborting */
-int transAbortProcess(local_thread_data_array_t *localtdata) {
+int transAbortProcess(trans_commit_data_t *transinfo) {
int i, numlocked;
unsigned int *objlocked;
void *header;
-
- numlocked = localtdata->transinfo->numlocked;
- objlocked = localtdata->transinfo->objlocked;
-
+
+ numlocked = transinfo->numlocked;
+ objlocked = transinfo->objlocked;
+
+ int useWriteUnlock = 0;
for (i = 0; i < numlocked; i++) {
+ if(objlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = mhashSearch(objlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+ if(!useWriteUnlock) {
+ read_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ }
}
return 0;
}
-/*This function completes the COMMIT process is the transaction is commiting*/
-int transComProcess(local_thread_data_array_t *localtdata) {
+/*This function completes the COMMIT process if the transaction is commiting*/
+int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) {
objheader_t *header, *tcptr;
int i, nummod, tmpsize, numcreated, numlocked;
unsigned int *oidmod, *oidcreated, *oidlocked;
void *ptrcreate;
-
- nummod = localtdata->tdata->buffer->f.nummod;
- oidmod = localtdata->tdata->buffer->oidmod;
- numcreated = localtdata->tdata->buffer->f.numcreated;
- oidcreated = localtdata->tdata->buffer->oidcreated;
- numlocked = localtdata->transinfo->numlocked;
- oidlocked = localtdata->transinfo->objlocked;
-
+
+ nummod = tdata->f.nummod;
+ oidmod = tdata->oidmod;
+ numcreated = tdata->f.numcreated;
+ oidcreated = tdata->oidcreated;
+ numlocked = transinfo->numlocked;
+ oidlocked = transinfo->objlocked;
+
for (i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
/* Copy from transaction cache -> main object store */
- if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+ if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) {
printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
GETSIZE(tmpsize, header);
char *tmptcptr = (char *) tcptr;
- memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+ {
+ struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+ struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
+ dst->___cachedCode___=src->___cachedCode___;
+ dst->___cachedHash___=src->___cachedHash___;
+
+ memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+ }
+
header->version += 1;
if(header->notifylist != NULL) {
notifyAll(&header->notifylist, OID(header), header->version);
}
/* If object is newly created inside transaction then commit it */
for (i = 0; i < numcreated; i++) {
- if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+ if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) {
printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
return 1;
}
return 1;
}
pthread_mutex_unlock(&mainobjstore_mutex);
+ /* Initialize read and write locks */
+ initdsmlocks(STATUSPTR(header));
memcpy(ptrcreate, header, tmpsize);
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
}
/* Unlock locked objects */
+ int useWriteUnlock = 0;
for(i = 0; i < numlocked; i++) {
+ if(oidlocked[i] == -1) {
+ useWriteUnlock = 1;
+ continue;
+ }
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- UnLock(STATUSPTR(header));
+ if(!useWriteUnlock) {
+ read_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ }
}
-
return 0;
}
prefetchpile_t *foundLocal(char *ptr) {
+ int siteid = *(GET_SITEID(ptr));
int ntuples = *(GET_NTUPLES(ptr));
unsigned int * oidarray = GET_PTR_OID(ptr);
- unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
prefetchpile_t * head=NULL;
-
+ int numLocal = 0;
+
int i;
- for(i=0;i<ntuples; i++) {
- unsigned short baseindex=(i==0)?0:endoffsets[i-1];
+ for(i=0; i<ntuples; i++) {
+ unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
unsigned short endindex=endoffsets[i];
unsigned int oid=oidarray[i];
int newbase;
if (oid==0)
continue;
//Look up fields locally
- for(newbase=baseindex;newbase<endindex;newbase++) {
+ for(newbase=baseindex; newbase<endindex; newbase++) {
if (!lookupObject(&oid, arryfields[newbase]))
break;
//Ended in a null pointer...
goto tuple;
}
//Entire prefetch is local
- if (newbase==endindex&&checkoid(oid))
+ if (newbase==endindex&&checkoid(oid)) {
+ numLocal++;
goto tuple;
+ }
//Add to remote requests
machinenum=lhashSearch(oid);
insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
- tuple:
+tuple:
;
}
+
+ /* handle dynamic prefetching */
+ handleDynPrefetching(numLocal, ntuples, siteid);
return head;
}
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
while(1) {
- /* lock mutex of primary prefetch queue */
+ /* read from prefetch queue */
void *node=gettail();
- /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
prefetchpile_t *pilehead = foundLocal(node);
if (pilehead!=NULL) {
- // Get sock from shared pool
+ // Get sock from shared pool
int sd = getSock2(transPrefetchSockPool, pilehead->mid);
-
+
/* Send Prefetch Request */
prefetchpile_t *ptr = pilehead;
while(ptr != NULL) {
sendPrefetchReq(ptr, sd);
- ptr = ptr->next;
+ ptr = ptr->next;
}
-
+
/* Release socket */
// freeSock(transPrefetchSockPool, pilehead->mid, sd);
-
+
/* Deallocated pilehead */
mcdealloc(pilehead);
}
void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
objpile_t *tmp;
-
+
int size=sizeof(char)+sizeof(int);
- for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
+ for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
}
-
+
char buft[size];
char *buf=buft;
*buf=TRANS_PREFETCH;
buf+=sizeof(char);
-
- for(tmp=mcpilenode->objpiles;tmp!=NULL;tmp=tmp->next) {
+
+ for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
*((int*)buf)=len;
buf+=sizeof(int);
*((unsigned int *)buf)=tmp->oid;
buf+=sizeof(unsigned int);
- *((unsigned int *)(buf)) = myIpAddr;
+ *((unsigned int *)(buf)) = myIpAddr;
buf+=sizeof(unsigned int);
memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
buf+=tmp->numoffset*sizeof(short);
int len, endpair;
char control;
objpile_t *tmp;
-
+
/* Send TRANS_PREFETCH control message */
control = TRANS_PREFETCH;
send_data(sd, &control, sizeof(char));
-
+
/* Send Oids and offsets in pairs */
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
buf+=sizeof(int);
*((unsigned int *)buf) = tmp->oid;
buf+=sizeof(unsigned int);
- *((unsigned int *)buf) = myIpAddr;
+ *((unsigned int *)buf) = myIpAddr;
buf += sizeof(unsigned int);
memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
send_data(sd, oidnoffset, len);
tmp = tmp->next;
}
-
+
/* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
endpair = -1;
send_data(sd, &endpair, sizeof(int));
-
+
return;
}
char control;
unsigned int oid;
void *modptr, *oldptr;
-
- recv_data((int)sd, &length, sizeof(int));
+
+ recv_data((int)sd, &length, sizeof(int));
size = length - sizeof(int);
char recvbuffer[size];
prehashRemove(oid);
prehashInsert(oid, modptr);
}
- } else {/* Else add the object ptr to hash table*/
+ } else { /* Else add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
/* Lock the Prefetch Cache look up table*/
pthread_mutex_lock(&pflookup.lock);
- /* Broadcast signal on prefetch cache condition variable */
+ /* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);
/* Unlock the Prefetch Cache look up table*/
pthread_mutex_unlock(&pflookup.lock);
oid = *((unsigned int *)(recvbuffer + sizeof(char)));
/* TODO: For each object not found query DHT for new location and retrieve the object */
/* Throw an error */
- printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+ //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
// exit(-1);
} else {
printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
}
-
+
return 0;
}
objheader_t *objheader;
unsigned short numoffset[] ={0};
short fieldoffset[] ={};
-
+
if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
- if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
- prefetch(1, &oid, numoffset, fieldoffset);
- pthread_mutex_lock(&pflookup.lock);
- while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
- pthread_cond_wait(&pflookup.cond, &pflookup.lock);
- }
- pthread_mutex_unlock(&pflookup.lock);
+#ifdef CACHE
+ if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
+#endif
+ unsigned int mid = lhashSearch(oid);
+ int sd = getSock2(transReadSockPool, mid);
+ char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
+ remotereadrequest[0] = READ_REQUEST;
+ *((unsigned int *)(&remotereadrequest[1])) = oid;
+ send_data(sd, remotereadrequest, sizeof(remotereadrequest));
+
+ /* Read response from the Participant */
+ char control;
+ recv_data(sd, &control, sizeof(char));
+
+ if (control==OBJECT_NOT_FOUND) {
+ printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
+ fflush(stdout);
+ exit(-1);
+ } else {
+ /* Read object if found into local cache */
+ int size;
+ recv_data(sd, &size, sizeof(int));
+#ifdef CACHE
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, objheader, size);
+ prehashInsert(oid, objheader);
+ return TYPE(objheader);
+#else
+ char *buffer;
+ if((buffer = calloc(1, size)) == NULL) {
+ printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
+ fflush(stdout);
+ return 0;
}
+ recv_data(sd, buffer, size);
+ objheader = (objheader_t *)buffer;
+ unsigned short type = TYPE(objheader);
+ free(buffer);
+ return type;
+#endif
+ }
+#ifdef CACHE
+ }
+#endif
}
-
return TYPE(objheader);
}
-int startRemoteThread(unsigned int oid, unsigned int mid)
-{
- int sock;
- struct sockaddr_in remoteAddr;
- char msg[1 + sizeof(unsigned int)];
- int bytesSent;
- int status;
+int startRemoteThread(unsigned int oid, unsigned int mid) {
+ int sock;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + sizeof(unsigned int)];
+ int bytesSent;
+ int status;
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
- {
- perror("startRemoteThread():socket()");
- return -1;
- }
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("startRemoteThread():socket()");
+ return -1;
+ }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
- {
- printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
- }
- else
- {
- msg[0] = START_REMOTE_THREAD;
- *((unsigned int *) &msg[1]) = oid;
- send_data(sock, msg, 1 + sizeof(unsigned int));
- }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ } else
+ {
+ msg[0] = START_REMOTE_THREAD;
+ *((unsigned int *) &msg[1]) = oid;
+ send_data(sock, msg, 1 + sizeof(unsigned int));
+ }
- close(sock);
- return status;
+ close(sock);
+ return status;
}
//TODO: when reusing oids, make sure they are not already in use!
static unsigned int id = 0xFFFFFFFF;
unsigned int getNewOID(void) {
- id += 2;
- if (id > oidMax || id < oidMin)
- {
- id = (oidMin | 1);
- }
- return id;
+ id += 2;
+ if (id > oidMax || id < oidMin) {
+ id = (oidMin | 1);
+ }
+ return id;
}
-int processConfigFile()
-{
- FILE *configFile;
- const int maxLineLength = 200;
- char lineBuffer[maxLineLength];
- char *token;
- const char *delimiters = " \t\n";
- char *commentBegin;
- in_addr_t tmpAddr;
-
- configFile = fopen(CONFIG_FILENAME, "r");
- if (configFile == NULL)
- {
- printf("error opening %s:\n", CONFIG_FILENAME);
- perror("");
- return -1;
- }
-
- numHostsInSystem = 0;
- sizeOfHostArray = 8;
- hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
-
- while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
- {
- commentBegin = strchr(lineBuffer, '#');
- if (commentBegin != NULL)
- *commentBegin = '\0';
- token = strtok(lineBuffer, delimiters);
- while (token != NULL)
- {
- tmpAddr = inet_addr(token);
- if ((int)tmpAddr == -1)
- {
- printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
- fclose(configFile);
- return -1;
- }
- else
- addHost(htonl(tmpAddr));
- token = strtok(NULL, delimiters);
- }
- }
+int processConfigFile() {
+ FILE *configFile;
+ const int maxLineLength = 200;
+ char lineBuffer[maxLineLength];
+ char *token;
+ const char *delimiters = " \t\n";
+ char *commentBegin;
+ in_addr_t tmpAddr;
+
+ configFile = fopen(CONFIG_FILENAME, "r");
+ if (configFile == NULL) {
+ printf("error opening %s:\n", CONFIG_FILENAME);
+ perror("");
+ return -1;
+ }
+ numHostsInSystem = 0;
+ sizeOfHostArray = 8;
+ hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
+
+ while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
+ commentBegin = strchr(lineBuffer, '#');
+ if (commentBegin != NULL)
+ *commentBegin = '\0';
+ token = strtok(lineBuffer, delimiters);
+ while (token != NULL) {
+ tmpAddr = inet_addr(token);
+ if ((int)tmpAddr == -1) {
+ printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
fclose(configFile);
-
- if (numHostsInSystem < 1)
- {
- printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
- return -1;
- }
+ return -1;
+ } else
+ addHost(htonl(tmpAddr));
+ token = strtok(NULL, delimiters);
+ }
+ }
+
+ fclose(configFile);
+
+ if (numHostsInSystem < 1) {
+ printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
+ return -1;
+ }
#ifdef MAC
- myIpAddr = getMyIpAddr("en1");
+ myIpAddr = getMyIpAddr("en1");
#else
- myIpAddr = getMyIpAddr("eth0");
+ myIpAddr = getMyIpAddr("eth0");
#endif
- myIndexInHostArray = findHost(myIpAddr);
- if (myIndexInHostArray == -1)
- {
- printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
- return -1;
- }
- oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
- oidMin = oidsPerBlock * myIndexInHostArray;
- if (myIndexInHostArray == numHostsInSystem - 1)
- oidMax = 0xFFFFFFFF;
- else
- oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
+ myIndexInHostArray = findHost(myIpAddr);
+ if (myIndexInHostArray == -1) {
+ printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
+ return -1;
+ }
+ oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
+ oidMin = oidsPerBlock * myIndexInHostArray;
+ if (myIndexInHostArray == numHostsInSystem - 1)
+ oidMax = 0xFFFFFFFF;
+ else
+ oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
- return 0;
+ return 0;
}
-void addHost(unsigned int hostIp)
-{
- unsigned int *tmpArray;
+void addHost(unsigned int hostIp) {
+ unsigned int *tmpArray;
- if (findHost(hostIp) != -1)
- return;
+ if (findHost(hostIp) != -1)
+ return;
- if (numHostsInSystem == sizeOfHostArray)
- {
- tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
- memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
- free(hostIpAddrs);
- hostIpAddrs = tmpArray;
- }
+ if (numHostsInSystem == sizeOfHostArray) {
+ tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
+ memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
+ free(hostIpAddrs);
+ hostIpAddrs = tmpArray;
+ }
- hostIpAddrs[numHostsInSystem++] = hostIp;
+ hostIpAddrs[numHostsInSystem++] = hostIp;
- return;
+ return;
}
-int findHost(unsigned int hostIp)
-{
- int i;
- for (i = 0; i < numHostsInSystem; i++)
- if (hostIpAddrs[i] == hostIp)
- return i;
+int findHost(unsigned int hostIp) {
+ int i;
+ for (i = 0; i < numHostsInSystem; i++)
+ if (hostIpAddrs[i] == hostIp)
+ return i;
- //not found
- return -1;
+ //not found
+ return -1;
}
-/* This function sends notification request per thread waiting on object(s) whose version
+/* This function sends notification request per thread waiting on object(s) whose version
* changes */
int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
int sock,i;
pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
notifydata_t *ndata;
-
+
oid = oidarry[0];
if((mid = lhashSearch(oid)) == 0) {
printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
return;
}
-
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("reqNotify():socket()");
return -1;
}
-
+
bzero(&remoteAddr, sizeof(remoteAddr));
remoteAddr.sin_family = AF_INET;
remoteAddr.sin_port = htons(LISTEN_PORT);
remoteAddr.sin_addr.s_addr = htonl(mid);
-
+
/* Generate unique threadid */
threadid++;
-
+
/* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
if((status = notifyhashInsert(threadid, ndata)) != 0) {
printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
free(ndata);
- return -1;
+ return -1;
}
-
- /* Send number of oids, oidarry, version array, machine id and threadid */
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+
+ /* Send number of oids, oidarry, version array, machine id and threadid */
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
printf("reqNotify():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
free(ndata);
return -1;
} else {
*((unsigned int *)(&msg[1])) = numoid;
/* Send array of oids */
size = sizeof(unsigned int);
- {
- i = 0;
- while(i < numoid) {
- oid = oidarry[i];
- *((unsigned int *)(&msg[1] + size)) = oid;
- size += sizeof(unsigned int);
- i++;
- }
+
+ for(i = 0;i < numoid; i++) {
+ oid = oidarry[i];
+ *((unsigned int *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned int);
}
-
+
/* Send array of version */
- {
- i = 0;
- while(i < numoid) {
- version = versionarry[i];
- *((unsigned short *)(&msg[1] + size)) = version;
- size += sizeof(unsigned short);
- i++;
- }
+ for(i = 0;i < numoid; i++) {
+ version = versionarry[i];
+ *((unsigned short *)(&msg[1] + size)) = version;
+ size += sizeof(unsigned short);
}
-
- *((unsigned int *)(&msg[1] + size)) = myIpAddr;
- size += sizeof(unsigned int);
+
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
*((unsigned int *)(&msg[1] + size)) = threadid;
pthread_mutex_lock(&(ndata->threadnotify));
size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
pthread_mutex_unlock(&(ndata->threadnotify));
}
-
+
pthread_cond_destroy(&threadcond);
pthread_mutex_destroy(&threadnotify);
free(ndata);
}
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
- notifydata_t *ndata;
- int i, objIsFound = 0, index;
- void *ptr;
-
- //Look up the tid and call the corresponding pthread_cond_signal
- if((ndata = notifyhashSearch(tid)) == NULL) {
- printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
- return;
- } else {
- for(i = 0; i < ndata->numoid; i++) {
- if(ndata->oidarry[i] == oid){
- objIsFound = 1;
- index = i;
- }
- }
- if(objIsFound == 0){
- printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
- return;
- } else {
- if(version <= ndata->versionarry[index]){
- printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
- return;
- } else {
- /* Clear from prefetch cache and free thread related data structure */
- if((ptr = prehashSearch(oid)) != NULL) {
- prehashRemove(oid);
- }
- pthread_cond_signal(&(ndata->threadcond));
- }
- }
- }
+ notifydata_t *ndata;
+ int i, objIsFound = 0, index;
+ void *ptr;
+
+ //Look up the tid and call the corresponding pthread_cond_signal
+ if((ndata = notifyhashSearch(tid)) == NULL) {
+ printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ for(i = 0; i < ndata->numoid; i++) {
+ if(ndata->oidarry[i] == oid) {
+ objIsFound = 1;
+ index = i;
+ }
+ }
+ if(objIsFound == 0) {
+ printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ if(version <= ndata->versionarry[index]) {
+ printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
return;
+ } else {
+#ifdef CACHE
+ /* Clear from prefetch cache and free thread related data structure */
+ if((ptr = prehashSearch(oid)) != NULL) {
+ prehashRemove(oid);
+ }
+#endif
+ pthread_mutex_lock(&(ndata->threadnotify));
+ pthread_cond_signal(&(ndata->threadcond));
+ pthread_mutex_unlock(&(ndata->threadnotify));
+ }
+ }
+ }
+ return;
}
int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
struct sockaddr_in remoteAddr;
char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
int sock, status, size, bytesSent;
-
+
while(*head != NULL) {
ptr = *head;
- mid = ptr->mid;
+ mid = ptr->mid;
//create a socket connection to that machine
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("notifyAll():socket()");
return -1;
}
-
+
bzero(&remoteAddr, sizeof(remoteAddr));
remoteAddr.sin_family = AF_INET;
remoteAddr.sin_port = htons(LISTEN_PORT);
remoteAddr.sin_addr.s_addr = htonl(mid);
//send Thread Notify response and threadid to that machine
- if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
printf("notifyAll():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
fflush(stdout);
+ status = -1;
} else {
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;
*((unsigned short *)(&msg[1]+ size)) = version;
size+= sizeof(unsigned short);
*((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
-
+
size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
send_data(sock, msg, size);
}
}
void transAbort(transrecord_t *trans) {
+#ifdef ABORTREADERS
+ removetransactionhash(trans->lookupTable, trans);
+#endif
objstrDelete(trans->cache);
chashDelete(trans->lookupTable);
free(trans);
}
+
+/* This function inserts necessary information into
+ * a machine pile data structure */
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
+ plistnode_t *ptr, *tmp;
+ int found = 0, offset = 0;
+
+ tmp = pile;
+ //Add oid into a machine that is already present in the pile linked list structure
+ while(tmp != NULL) {
+ if (tmp->mid == mid) {
+ int tmpsize;
+
+ if (STATUS(headeraddr) & NEW) {
+ tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+ tmp->numcreated++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ tmp->oidmod[tmp->nummod] = OID(headeraddr);
+ tmp->nummod++;
+ GETSIZE(tmpsize, headeraddr);
+ tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
+ offset += sizeof(unsigned int);
+ *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
+ tmp->numread++;
+ }
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+ //Add oid for any new machine
+ if (!found) {
+ int tmpsize;
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return NULL;
+ }
+ ptr->mid = mid;
+ if (STATUS(headeraddr) & NEW) {
+ ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+ ptr->numcreated++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else if (STATUS(headeraddr) & DIRTY) {
+ ptr->oidmod[ptr->nummod] = OID(headeraddr);
+ ptr->nummod++;
+ GETSIZE(tmpsize, headeraddr);
+ ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
+ } else {
+ *((unsigned int *)ptr->objread)=OID(headeraddr);
+ offset = sizeof(unsigned int);
+ *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
+ ptr->numread++;
+ }
+ ptr->next = pile;
+ pile = ptr;
+ }
+
+ /* Clear Flags */
+ STATUS(headeraddr) =0;
+
+
+ return pile;
+}
+
+plistnode_t *sortPiles(plistnode_t *pileptr) {
+ plistnode_t *head, *ptr, *tail;
+ head = pileptr;
+ ptr = pileptr;
+ /* Get tail pointer */
+ while(ptr!= NULL) {
+ tail = ptr;
+ ptr = ptr->next;
+ }
+ ptr = pileptr;
+ plistnode_t *prev = pileptr;
+ /* Arrange local machine processing at the end of the pile list */
+ while(ptr != NULL) {
+ if(ptr != tail) {
+ if(ptr->mid == myIpAddr && (prev != pileptr)) {
+ prev->next = ptr->next;
+ ptr->next = NULL;
+ tail->next = ptr;
+ return pileptr;
+ }
+ if((ptr->mid == myIpAddr) && (prev == pileptr)) {
+ prev = ptr->next;
+ ptr->next = NULL;
+ tail->next = ptr;
+ return prev;
+ }
+ prev = ptr;
+ }
+ ptr = ptr->next;
+ }
+ return pileptr;
+}