return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
- printf("oid not found in local cache\n");
+ printf("oid is found in Local mlookup\n");
tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
//Copy into cache
return(objcopy);
} else {
//Get the object from the remote location
- //printf("oid not found in local machine lookup\n");
- printf("machinenumber = %d\n",machinenumber);
- printf("oid = %d\n",oid);
+ printf("oid is found in remote machine\n");
machinenumber = lhashSearch(oid);
- printf("machinenumber = %d\n",machinenumber);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
}
//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
int decideResponse(thread_data_array_t *tdata, int sd, int val) {
- int i, n, N, sum, oidcount = 0;
+ int i, n, N, sum, retval, oidcount = 0;
int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
char ctrl, control, *ptr;
unsigned int *oidnotfound;
objheader_t *header;
+
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
free(tdata->rec);
//send Abort
ctrl = TRANS_ABORT;
- if (write(sd, &ctrl, sizeof(char)) < 0) {
- perror("Error sending ctrl message for participant\n");
- return 1;
+ for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
+ if (write(sd, &ctrl, sizeof(char)) < 0) {
+ perror("Error sending ctrl message for participant\n");
+ return 1;
+ }
}
return 0;
//Send Commit
ctrl = TRANS_COMMIT;
printf("Sending TRANS_COMMIT\n");
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
+ //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
}
if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
//Send abort but retry commit
ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
- //Sleep
+ //Sleep and the resend the request
sleep(5);
//Read new control message from Participant
- n = read(sd, &control, sizeof(char));
+
+ if((n = read(sd, &control, sizeof(char))) <= 0) {
+ perror("No bytes are read for participant\n");
+ return 1;
+ }
//Update common data structure and increment count
tdata->recvmsg[tdata->thread_id].rcv_status = control;
}
if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
- //Send abort but retry commit after relloking up objects
- //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+ //Send abort but retry commit after relooking up objects
ctrl = TRANS_ABORT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
//TODO
//Relook up objects
//update location table
+
//Free pointers
free(oidnotfound);
}
char machineip[16];
tdata = (thread_data_array_t *) threadarg;
- printf("DEBUG -> New thread id %d\n", tdata->thread_id);
//Send Trans Request
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for TRANS_REQUEST");
midtoIP(tdata->mid,machineip);
machineip[15] = '\0';
serv_addr.sin_addr.s_addr = inet_addr(machineip);
- //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect for TRANS_REQUEST");
}
//Send oids and version number tuples for objects that are read
// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-// printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
+// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
perror("Error sending tuples for thread");
return NULL;
//Send objects that are modified
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-// printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
perror("Error sending obj modified for thread");
return NULL;
}
//Read message control message from participant side
- n = read(sd, &control, sizeof(char));
+ if((n = read(sd, &control, sizeof(char))) <= 0) {
+ perror("Error in reading control message from Participant\n");
+ return NULL;
+ }
recvcontrol = control;
- printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
//Update common data structure and increment count
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
if(*(tdata->count) == tdata->pilecount) {
pthread_cond_broadcast(tdata->threshold);
- //process the participant's request
- if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
- printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(tdata->lock);
- return NULL;
- }
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
}
+
+ //process the participant's request
+ if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+ return NULL;
+ }
pthread_mutex_unlock(tdata->lock);
close(sd);
char buffer[RECEIVE_BUFFER_SIZE],control;
char transid[TID_LEN];
static int newtid = 0;
+ trans_req_data_t *tosend;
ptr = record->lookupTable->table;
size = record->lookupTable->size;
pthread_cond_t tcond;
pthread_mutex_t tlock;
pthread_mutex_t tlshrd;
- thread_data_array_t thread_data_array[pilecount];
+ //thread_data_array_t thread_data_array[pilecount];
+ thread_data_array_t *thread_data_array;
+
+ thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
pListMid(pile, listmid);
//Process each machine group
while(tmp != NULL) {
- printf("DEBUG -> Created thread %d... \n", numthreads);
//Create transaction id
newtid++;
- trans_req_data_t *tosend;
+ //trans_req_data_t *tosend;
if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
perror("Error in pthread create");
return 1;
}
+
numthreads++;
//TODO frees
- free(tosend);
+ //free(tosend);
tmp = tmp->next;
}
//Free resources
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
+// for(i = 0 ;i< pilecount ;i++) {
+ free(tosend);
+// }
free(listmid);
pDelete(pile);
return 0;
//mnun will be used to represent the machine IP address later
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
- int sd, size;
+ int sd, size, val;
struct sockaddr_in serv_addr;
struct hostent *server;
char control;
void *objcopy;
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket");
+ perror("Error in socket\n");
return NULL;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_addr.s_addr = inet_addr(machineip);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect");
+ perror("Error in connect\n");
return NULL;
}
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
- perror("Error sending message");
+ perror("Error sending message\n");
return NULL;
}
printf("DEBUG -> ready to rcv ...\n");
#endif
//Read response from the Participant
- read(sd, &control, sizeof(char));
+ if((val = read(sd, &control, sizeof(char))) <= 0) {
+ perror("No control response for getRemoteObj sent\n");
+ return NULL;
+ }
switch(control) {
case OBJECT_NOT_FOUND:
return NULL;
break;
case OBJECT_FOUND:
- read(sd, &size, sizeof(int));
+ if((val = read(sd, &size, sizeof(int))) <= 0) {
+ perror("No size is read from the participant\n");
+ return NULL;
+ }
objcopy = objstrAlloc(record->cache, size);
- read(sd, objcopy, size);
+ if((val = read(sd, objcopy, size)) <= 0) {
+ perror("No objects are read from the remote participant\n");
+ return NULL;
+ }
//Insert into cache's lookup table
chashInsert(record->lookupTable, oid, objcopy);
break;
printf("Error in recv request from participant on a READ_REQUEST\n");
return NULL;
}
+ close(sd);
return objcopy;
}