#include<sys/socket.h>
#include<netdb.h>
#include<netinet/in.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <time.h>
#define LISTEN_PORT 2156
#define MACHINE_IP "127.0.0.1"
extern int classsize[];
+void randomdelay(void)
+{
+ struct timespec req, rem;
+ time_t t;
+
+ t = time(NULL);
+ req.tv_sec = 0;
+ req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+ nanosleep(&req, &rem);
+ return;
+}
+
transrecord_t *transStart()
{
transrecord_t *tmp = malloc(sizeof(transrecord_t));
objheader_t *transRead(transrecord_t *record, unsigned int oid)
{
- printf("Enter TRANS_READ\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
void *buf;
//check cache
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
- printf("DEBUG -> transRead oid %d found local\n", oid);
+ //printf("DEBUG -> transRead oid %d found local\n", oid);
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
- printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
+ //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
return NULL;
}
else {
- printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
+ //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
return(objcopy);
}
}
chashInsert(record->lookupTable, tmp->oid, tmp);
return tmp;
}
-//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
-int decideResponse(thread_data_array_t *tdata, int sd, int val) {
- int i, n, N, sum, retval, oidcount = 0;
- int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
- char ctrl, control, *ptr;
- unsigned int *oidnotfound;
- objheader_t *header;
- printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
+//This function decides the reponse that needs to be sent to all other machines involved in a
+//transaction by the machine that initiated the transaction request
+
+int decideResponse(thread_data_array_t *tdata) {
+ char control;
+ int i, transagree = 0, transdisagree = 0, transsoftabort = 0;
+
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
//Switch case
control = tdata->recvmsg[i].rcv_status;
switch(control) {
case TRANS_DISAGREE:
- printf("DEBUG-> Inside TRANS_DISAGREE\n");
+ printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
transdisagree++;
- //Free transaction records
- objstrDelete(tdata->rec->cache);
- chashDelete(tdata->rec->lookupTable);
- free(tdata->rec);
- //send Abort
- ctrl = TRANS_ABORT;
- for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
- if (send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- }
- return 0;
+ break;
case TRANS_AGREE:
- printf("DEBUG-> Inside TRANS_AGREE\n");
- PRINT_TID(tdata);
+ printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
transagree++;
break;
case TRANS_SOFT_ABORT:
- printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+ printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
transsoftabort++;
- /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
- if ((i == tdata->thread_id) && (val == 0)) {
- //Read list of objects missing
- if(read(sd, &oidcount, sizeof(int)) != 0) {
- //Break if only objs are locked at the Participant side
- if (oidcount == 0) {
- break;
- }
- transsoftabortmiss++;
- N = oidcount * sizeof(unsigned int);
- if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- }
- ptr = (char *) oidnotfound;
- do {
- n = read(sd, ptr+sum, N-sum);
- sum += n;
- } while(sum < N && n !=0);
- }
- }
-
break;
default:
- printf("Participant sent unknown message\n");
+ printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
+ return -1;
}
}
//Decide what control message to send to Participant
- if(transagree == tdata->pilecount){
+ if(transdisagree > 0) {
+ //Send Abort
+ *(tdata->replyctrl) = TRANS_ABORT;
+ printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+ objstrDelete(tdata->rec->cache);
+ chashDelete(tdata->rec->lookupTable);
+ free(tdata->rec);
+ } else if(transagree == tdata->pilecount){
//Send Commit
- ctrl = TRANS_COMMIT;
- printf("Sending TRANS_COMMIT\n");
- if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
+ *(tdata->replyctrl) = TRANS_COMMIT;
+ printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
+ objstrDelete(tdata->rec->cache);
+ chashDelete(tdata->rec->lookupTable);
+ free(tdata->rec);
+ } else if(transsoftabort > 0 && transdisagree == 0) {
+ //Send Abort
+ *(tdata->replyctrl) = TRANS_ABORT;
+ *(tdata->replyretry) = 1;
+ //objstrDelete(tdata->rec->cache);
+ //chashDelete(tdata->rec->lookupTable);
+ //free(tdata->rec);
+ printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
+ } else {
+ printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
+ return -1;
}
+
+ return 0;
+}
+//This function sends the final response to all threads in their respective socket id
+char sendResponse(thread_data_array_t *tdata, int sd) {
+ int n, N, sum, oidcount = 0;
+ char *ptr, retval = 0;
+ unsigned int *oidnotfound;
- if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
- //Send abort but retry commit
- ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
- printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
- if((retval = send(sd, &ctrl, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- /*
- //Sleep and the resend the request
- sleep(2);
- //Read new control message from Participant
-
- if((n = read(sd, &control, sizeof(char))) <= 0) {
- perror("No bytes are read for participant\n");
- return 1;
+ //If the decided response is due to a soft abort and missing objects at the Participant's side
+ if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
+ //Read list of objects missing
+ if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+ //Break if only objs are locked at the Participant side
+ N = oidcount * sizeof(unsigned int);
+ if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ }
+ ptr = (char *) oidnotfound;
+ do {
+ n = read(sd, ptr+sum, N-sum);
+ sum += n;
+ } while(sum < N && n !=0);
}
-
- //Update common data structure and increment count
- tdata->recvmsg[tdata->thread_id].rcv_status = control;
- val = 1;
- decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
- */
+ retval = TRANS_SOFT_ABORT;
}
-
- if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
- //Send abort but retry commit after relooking up objects
- ctrl = TRANS_ABORT;
- printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
- if((retval = send(sd, &ctrl, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 1;
- }
- //TODO
- //Relook up objects
- //update location table
-
- //Free pointers
- free(oidnotfound);
+ //If the decided response is TRANS_ABORT
+ if(*(tdata->replyctrl) == TRANS_ABORT) {
+ retval = TRANS_ABORT;
}
-
- return 0;
+ if(*(tdata->replyctrl) == TRANS_COMMIT) {
+ retval = TRANS_COMMIT;
+ }
+ // Send response to the Participant
+ if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
+ perror("Error sending ctrl message for participant\n");
+ }
+
+ return retval;
}
void *transRequest(void *threadarg) {
struct hostent *server;
thread_data_array_t *tdata;
objheader_t *headeraddr;
- //unsigned int *oidnotfound;
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
- char machineip[16];
+ char machineip[16], retval;
tdata = (thread_data_array_t *) threadarg;
//Send Trans Request
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket for TRANS_REQUEST");
+ perror("Error in socket for TRANS_REQUEST\n");
return NULL;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
midtoIP(tdata->mid,machineip);
machineip[15] = '\0';
serv_addr.sin_addr.s_addr = inet_addr(machineip);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect for TRANS_REQUEST");
+ perror("Error in connect for TRANS_REQUEST\n");
return NULL;
}
-
- //Multiple writes for sending packets of data
- //Send first few fixed bytes of the TRANS_REQUEST protocol
- printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
-// printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
-// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
+
+ printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
+ //Send bytes of data with TRANS_REQUEST control message
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
- perror("Error sending fixed bytes for thread");
+ perror("Error sending fixed bytes for thread\n");
return NULL;
}
//Send list of machines involved in the transaction
-// printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
{
int size=sizeof(unsigned int)*tdata->pilecount;
if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
- perror("Error sending list of machines for thread");
+ perror("Error sending list of machines for thread\n");
return NULL;
}
}
//Send oids and version number tuples for objects that are read
-// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
{
int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
- perror("Error sending tuples for thread");
+ perror("Error sending tuples for thread\n");
return NULL;
}
}
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
size=sizeof(objheader_t)+classsize[headeraddr->type];
if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
- perror("Error sending obj modified for thread");
+ perror("Error sending obj modified for thread\n");
return NULL;
}
}
-
+
//Read message control message from participant side
if((n = read(sd, &control, sizeof(char))) <= 0) {
perror("Error in reading control message from Participant\n");
//Lock and update count
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(tdata->lock);
- (*(tdata->count))++;
+
+ (*(tdata->count))++;
if(*(tdata->count) == tdata->pilecount) {
+ if (decideResponse(tdata) != 0) {
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+ close(sd);
+ return NULL;
+ }
pthread_cond_broadcast(tdata->threshold);
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
}
- //process the participant's request
- if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
- printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+
+ if (sendResponse(tdata, sd) == 0) {
+ printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(tdata->lock);
+ close(sd);
return NULL;
}
- pthread_mutex_unlock(tdata->lock);
-
close(sd);
pthread_exit(NULL);
}
char transid[TID_LEN];
static int newtid = 0;
trans_req_data_t *tosend;
+ char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent
ptr = record->lookupTable->table;
size = record->lookupTable->size;
//Get machine location for object id
if ((machinenum = lhashSearch(curr->key)) == 0) {
- printf("Error: No such machine\n");
+ printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
return 1;
}
- //TODO only for debug
- //machinenum = 1;
if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
- printf("Error: No such oid\n");
+ printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
return 1;
}
//Make machine groups
pListMid(pile, listmid);
//Process each machine group
+ //Should be a new function for while loop
while(tmp != NULL) {
//Create transaction id
newtid++;
thread_data_array[numthreads].threshold = &tcond;
thread_data_array[numthreads].lock = &tlock;
thread_data_array[numthreads].count = &trecvcount;
+ thread_data_array[numthreads].replyctrl = &treplyctrl;
+ thread_data_array[numthreads].replyretry = &treplyretry;
thread_data_array[numthreads].rec = record;
rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
perror("Error in pthread create");
return 1;
}
-
numthreads++;
//TODO frees
- //free(tosend);
tmp = tmp->next;
}
//Free resources
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
-// for(i = 0 ;i< pilecount ;i++) {
- free(tosend);
-// }
+
+ free(tosend);
free(listmid);
pDelete(pile);
+ if(treplyretry == 1) {
+ //wait a random amount of time
+ randomdelay();
+ //sleep(1);
+ //Retry the commiting transaction again
+ transCommit(record);
+ }
+
return 0;
}
chashInsert(record->lookupTable, oid, objcopy);
break;
default:
- printf("Error in recv request from participant on a READ_REQUEST\n");
+ printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
return NULL;
}
close(sd);