}
int prefetchReq(int acceptfd) {
- int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size;
+ int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
unsigned int oid, index = 0;
char *ptr, buffer[PRE_BUF_SIZE];
void *mobj;
- unsigned int *oidnotfound, objoid;
- char *header;
+ unsigned int objoid;
+ char *header, control;
objheader_t * head;
/* Repeatedly recv the oid and offset pairs sent for prefetch */
while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
+ count++;
if(length == -1)
break;
sum = 0;
- index = 0;
+ index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
+ // first 4 bytes are saved to send the
+ // size of the buffer (that is computed at the end of the loop)
oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
N = numoffset * sizeof(short);
memcpy(buffer+index, &oid, sizeof(unsigned int));
index += sizeof(unsigned int);
} else { /* If Obj found in machine (i.e. has not moved) */
- /* Return the oid ..its header and data */
+ /* send the oid, it's size, it's header and data */
header = (char *) mobj;
head = (objheader_t *) header;
size = sizeof(objheader_t) + sizeof(classsize[head->type]);
index += sizeof(char);
memcpy(buffer+index, &oid, sizeof(unsigned int));
index += sizeof(unsigned int);
+ memcpy(buffer+index, &size, sizeof(int));
+ index += sizeof(int);
memcpy(buffer + index, header, size);
index += size;
/* Calculate the oid corresponding to the offset value */
for(i = 0 ; i< numoffset ; i++) {
objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
if((header = (char *) mhashSearch(objoid)) == NULL) {
- /* Obj not found, send oid and its offsets */
+ /* Obj not found, send oid */
*(buffer + index) = OBJECT_NOT_FOUND;
index += sizeof(char);
memcpy(buffer+index, &oid, sizeof(unsigned int));
index += sizeof(unsigned int);
break;
} else {/* Obj Found */
+ /* send the oid, it's size, it's header and data */
head = (objheader_t *) header;
size = sizeof(objheader_t) + sizeof(classsize[head->type]);
*(buffer + index) = OBJECT_FOUND;
index += sizeof(char);
memcpy(buffer+index, &oid, sizeof(unsigned int));
index += sizeof(unsigned int);
+ memcpy(buffer+index, &size, sizeof(int));
+ index += sizeof(int);
memcpy(buffer + index, header, size);
index += size;
continue;
printf("Char buffer is overflowing\n");
return 1;
}
- /* Send the buffer with all oids found and not found */
+ /* Send Prefetch response control message only once*/
+ if(count == 1) {
+ control = TRANS_PREFETCH_RESPONSE;
+ if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+ perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
+ return 1;
+ }
+ }
+
+ /* Send the buffer size */
+ memcpy(buffer, &index, sizeof(unsigned int));
+ /* Send the entire buffer with its size and oids found and not found */
if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
- perror("Error sending size of object\n");
+ perror("Error sending oids found\n");
return 1;
}
}
-
return 0;
}
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR))
- return; //Failure
+ return; //Failure
//Initialize primary shared queue
queueInit();
//Initialize machine pile w/prefetch oids and offsets shared queue
transrecord_t *tmp = malloc(sizeof(transrecord_t));
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
-
+
return tmp;
}
void *objcopy;
int size;
void *buf;
- /* Search local cache */
+ /* Search local cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
-// tmp = mhashSearch(oid);
+ // tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)objheader, size);
}
next = curr->next;
//Get machine location for object id
-
+
if ((machinenum = lhashSearch(curr->key)) == 0) {
- printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
- return NULL;
+ printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+ return NULL;
}
if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
printf("pInsert error %s, %d\n", __FILE__, __LINE__);
return NULL;
}
-
+
/* Check if local or not */
if((localmachinenum = mhashSearch(curr->key)) != NULL) {
pile->local = 1; //True i.e. local
}
-
+
curr = next;
}
}
/* Count the number of participants */
pilecount = pCount(pile);
-
+
/* Create a list of machine ids(Participants) involved in transaction */
if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
}
pListMid(pile, listmid);
-
+
/* Initialize thread variables,
* Spawn a thread for each Participant involved in a transaction */
pthread_cond_t tcond;
pthread_mutex_t tlock;
pthread_mutex_t tlshrd;
-
+
thread_data_array_t *thread_data_array;
thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
local_thread_data_array_t *ltdata;
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_mutex_init(&tlock, NULL);
pthread_cond_init(&tcond, NULL);
-
+
/* Process each machine pile */
while(pile != NULL) {
//Create transaction id
return 1;
}
}
-
+
/* Free resources */
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
/* Retry the commiting transaction again */
transCommit(record);
}
-
+
return 0;
}
perror("Error in connect for TRANS_REQUEST\n");
return NULL;
}
-
+
printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
/* Send bytes of data with TRANS_REQUEST control message */
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
}
/* Send list of machines involved in the transaction */
{
- int size=sizeof(unsigned int)*tdata->pilecount;
- if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
- perror("Error sending list of machines for thread\n");
- return NULL;
- }
+ int size=sizeof(unsigned int)*tdata->pilecount;
+ if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
+ perror("Error sending list of machines for thread\n");
+ return NULL;
+ }
}
/* Send oids and version number tuples for objects that are read */
{
- int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
- if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
- perror("Error sending tuples for thread\n");
- return NULL;
- }
+ int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
+ if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
+ perror("Error sending tuples for thread\n");
+ return NULL;
+ }
}
/* Send objects that are modified */
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
- int size;
- headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
- size=sizeof(objheader_t)+classsize[headeraddr->type];
- if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
- perror("Error sending obj modified for thread\n");
- return NULL;
- }
+ int size;
+ headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ size=sizeof(objheader_t)+classsize[headeraddr->type];
+ if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
+ perror("Error sending obj modified for thread\n");
+ return NULL;
+ }
}
/* Read control message from Participant */
return NULL;
}
recvcontrol = control;
-
+
/* Update common data structure and increment count */
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
transagree++;
break;
-
+
case TRANS_SOFT_ABORT:
printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
transsoftabort++;
return -1;
}
}
-
+
/* Decide what control message to send to Participant */
if(transdisagree > 0) {
/* Send Abort */
printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
return -1;
}
-
+
return 0;
}
/* This function sends the final response to all threads in their respective socket id */
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
printf("DEBUG -> Sending TRANS_DISAGREE\n");
- // return tdata->recvmsg[tdata->thread_id].rcv_status;
+ // return tdata->recvmsg[tdata->thread_id].rcv_status;
}
}
}
printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
/* Send number of oids not found and the missing oids if objects are missing in the machine */
/* TODO Remember to store the oidnotfound for later use
- if(objnotfound != 0) {
- int size = sizeof(unsigned int)* objnotfound;
- }
- */
+ if(objnotfound != 0) {
+ int size = sizeof(unsigned int)* objnotfound;
+ }
+ */
}
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
/*Set flag to show that common data structure for this individual thread has been written to */
//*(tdata->localstatus) |= LM_UPDATED;
-
+
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
free(localtdata->transinfo->objnotfound);
localtdata->transinfo->objnotfound = NULL;
}
-
+
pthread_exit(NULL);
}
/* This function completes the ABORT process if the transaction is aborting
- */
+*/
int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
char *ptr;
int i;
}
/*This function completes the COMMIT process is the transaction is commiting
- */
- int transComProcess(trans_commit_data_t *transinfo) {
- objheader_t *header;
- int i = 0, offset = 0;
- char control;
-
- printf("DEBUG -> Recv TRANS_COMMIT\n");
- /* Process each modified object saved in the mainobject store */
- for(i=0; i<transinfo->nummod; i++) {
- if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
- printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- }
- /* Change reference count of older address and free space in objstr ?? */
- header->rcount = 1; //TODO Not sure what would be the val
-
- /* Change ptr address in mhash table */
- mhashRemove(transinfo->objmod[i]);
- mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
- offset += sizeof(objheader_t) + classsize[header->type];
-
- /* Update object version number */
- header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
- header->version += 1;
- }
-
- /* Unlock locked objects */
- for(i=0; i<transinfo->numlocked; i++) {
- header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
- header->status &= ~(LOCK);
- }
-
- //TODO Update location lookup table
- //TODO/* Unset the bit for local objects */
-
- /* Send ack to Coordinator */
- printf("DEBUG-> TRANS_SUCESSFUL\n");
- return 0;
- }
+*/
+int transComProcess(trans_commit_data_t *transinfo) {
+ objheader_t *header;
+ int i = 0, offset = 0;
+ char control;
+
+ printf("DEBUG -> Recv TRANS_COMMIT\n");
+ /* Process each modified object saved in the mainobject store */
+ for(i=0; i<transinfo->nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ }
+ /* Change reference count of older address and free space in objstr ?? */
+ header->rcount = 1; //TODO Not sure what would be the val
+
+ /* Change ptr address in mhash table */
+ mhashRemove(transinfo->objmod[i]);
+ mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+ offset += sizeof(objheader_t) + classsize[header->type];
+
+ /* Update object version number */
+ header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+ header->version += 1;
+ }
+
+ /* Unlock locked objects */
+ for(i=0; i<transinfo->numlocked; i++) {
+ header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+ header->status &= ~(LOCK);
+ }
+
+ //TODO Update location lookup table
+ //TODO/* Unset the bit for local objects */
+
+ /* Send ack to Coordinator */
+ printf("DEBUG-> TRANS_SUCESSFUL\n");
+ return 0;
+}
/* This function checks if the prefetch oids are same and have same offsets
* for case x.a.b and y.a.b where x and y have same oid's
short *endoffsets, *arryfields;
/* Check for the case x.y.z and a.b.c are same oids */
- ptr = (char *) node;
+ ptr = (char *) node;
ntuples = *(GET_NTUPLES(ptr));
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
prefetchpile_t *head = NULL;
/* Check for the case x.y.z and a.b.c are same oids */
- ptr = (char *) node;
+ ptr = (char *) node;
ntuples = *(GET_NTUPLES(ptr));
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
short *endoffsets, *arryfields;
prefetchpile_t *head = NULL;
- ptr = (char *) node;
+ ptr = (char *) node;
ntuples = *(GET_NTUPLES(ptr));
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
} else
flag = 0;
}
-
+
/*If all offset oids are found locally,make the prefetch tuple invalid */
if(flag == 0) {
oid[i] = -1;
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
pilehead = foundLocal(qnode);
-
+
/* Lock mutex of pool queue */
pthread_mutex_lock(&mcqueue.qlock);
/* Update the pool queue with the new remote machine piles generated per prefetch call */
pthread_mutex_unlock(&mcqueue.qlock);
/*Initiate connection to remote host and send request */
- sendPrefetchReq(mcpilenode, tid);
/* Process Request */
-
-
+ sendPrefetchReq(mcpilenode, tid);
/* TODO: For each object not found query DHT for new location and retrieve the object */
-
+
/* Deallocate the dequeued node */
}
}
pthread_t thread[numoids];
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
+
/* Create Machine Piles to send prefetch requests use threads*/
for( i = 0 ; i< numoids ; i++) {
if(arrayofoffset[i][0] == -1)
}
}
pthread_attr_destroy(&attr);
-
+
return 0;
-
+
}
void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
- int sd, i, offset, off, len, endpair;
+ int sd, i, offset, off, len, endpair, numoffsets, count = 0;
struct sockaddr_in serv_addr;
struct hostent *server;
char machineip[16], control;
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
off = offset = 0;
+ count++; // Keeps track of the number of oid and offset tuples sent per remote machine
len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
char oidnoffset[len];
memcpy(oidnoffset, &len, sizeof(int));
tmp = tmp->next;
}
+ /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
endpair = -1;
if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
perror("Error sending fixed bytes for thread\n");
}
/* Get Response from the remote machine */
- getPrefetchResponse();
-
-// close(sd);
+ getPrefetchResponse(count,sd);
+ close(sd);
}
-void getPrefetchResponse() {
- int i;
-
- /* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
-
- /*TODO For each object found add to Prefetch Cache */
-
- /* Broadcast signal on prefetch cache condition variable */
- pthread_cond_broadcast(&pflookup.qcond);
-
- /* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
-
-
-
-
+void getPrefetchResponse(int count, int sd) {
+ int i = 0, val, n, N, sum, index, objsize;
+ unsigned int bufsize,oid;
+ char buffer[RECEIVE_BUFFER_SIZE], control;
+ char *ptr;
+ void *modptr;
+ /* Read prefetch response from the Remote machine */
+ if((val = read(sd, &control, sizeof(char))) <= 0) {
+ perror("No control response for Prefetch request sent\n");
+ return;
+ }
+ if(control == TRANS_PREFETCH_RESPONSE) {
+ /*For each oid and offset tuple sent as prefetch request to remote machine*/
+ while(i < count) {
+ /* Clear contents of buffer */
+ memset(buffer, 0, RECEIVE_BUFFER_SIZE);
+ sum = 0;
+ index = 0;
+ /* Read the size of buffer to be received */
+ if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) {
+ perror("Size of buffer not recv\n");
+ return;
+ }
+ memcpy(&bufsize, buffer, sizeof(unsigned int));
+ ptr = buffer + sizeof(unsigned int);
+ /* Keep receiving the buffer containing oid info */
+ do {
+ n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
+ sum +=n;
+ } while(sum < bufsize && n != 0);
+ /* Decode the contents of the buffer */
+ index = sizeof(unsigned int);
+ while(index < (bufsize - sizeof(unsigned int))) {
+ if(buffer[index] == OBJECT_FOUND) {
+ /* Increment it to get the object */
+ index += sizeof(char);
+ memcpy(&oid, buffer + index, sizeof(unsigned int));
+ index += sizeof(unsigned int);
+ /* Lock the Prefetch Cache look up table*/
+ pthread_mutex_lock(&pflookup.lock);
+ /* For each object found add to Prefetch Cache */
+ memcpy(&objsize, buffer + index, sizeof(int));
+ if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
+ printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ memcpy(modptr, buffer+index, objsize);
+ index += sizeof(int);
+ /* Add pointer and oid to hash table */
+ //TODO Do we need a version comparison herei ??
+ prehashInsert(oid, modptr);
+ /* Broadcast signal on prefetch cache condition variable */
+ pthread_cond_broadcast(&pflookup.cond);
+ /* Unlock the Prefetch Cache look up table*/
+ pthread_mutex_unlock(&pflookup.lock);
+ } else if(buffer[index] == OBJECT_NOT_FOUND) {
+ /* Increment it to get the object */
+ // TODO If object not found, local machine takes inventory
+ index += sizeof(char);
+ memcpy(&oid, buffer + index, sizeof(unsigned int));
+ index += sizeof(unsigned int);
+ } else
+ printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
+ }
+ i++;
+ }
+ } else
+ printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
}