11 #include<netinet/in.h>
13 #define LISTEN_PORT 2156
14 #define MACHINE_IP "127.0.0.1"
15 #define RECEIVE_BUFFER_SIZE 2048
17 extern int classsize[];
19 transrecord_t *transStart()
21 transrecord_t *tmp = malloc(sizeof(transrecord_t));
22 tmp->cache = objstrCreate(1048576);
23 tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
27 objheader_t *transRead(transrecord_t *record, unsigned int oid)
29 // printf("Enter TRANS_READ\n");
30 unsigned int machinenumber;
31 objheader_t *tmp, *objheader;
36 if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
37 printf("transRead oid %d found local\n %s, %d", oid, __FILE__, __LINE__);
39 } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
40 //Look up in Machine lookup table and found
42 //printf("oid is found in Local machinelookup\n");
43 tmp = mhashSearch(oid);
44 size = sizeof(objheader_t)+classsize[tmp->type];
46 objcopy = objstrAlloc(record->cache, size);
47 memcpy(objcopy, (void *)tmp, size);
48 //Insert into cache's lookup table
49 chashInsert(record->lookupTable, objheader->oid, objcopy);
52 //Get the object from the remote location
53 //printf("oid is found in remote machine\n");
54 machinenumber = lhashSearch(oid);
55 objcopy = getRemoteObj(record, machinenumber, oid);
57 //If object is not found in Remote location
58 printf("Object oid = %d not found in Machine %d at %s, %d\n", oid, machinenumber, __FILE__, __LINE__);
62 printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
68 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
70 objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
71 tmp->oid = getNewOID();
74 tmp->rcount = 0; //? not sure how to handle this yet
77 chashInsert(record->lookupTable, tmp->oid, tmp);
80 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
81 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
82 int i, n, N, sum, retval, oidcount = 0;
83 int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
84 char ctrl, control, *ptr;
85 unsigned int *oidnotfound;
88 // printf("DEBUG -> pilecount is %d\n", tdata->pilecount);
89 //Check common data structure
90 for (i = 0 ; i < tdata->pilecount ; i++) {
92 control = tdata->recvmsg[i].rcv_status;
95 // printf("DEBUG-> Inside TRANS_DISAGREE\n");
97 //Free transaction records
98 objstrDelete(tdata->rec->cache);
99 chashDelete(tdata->rec->lookupTable);
103 for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
104 if (write(sd, &ctrl, sizeof(char)) < 0) {
105 perror("Error sending ctrl message for participant\n");
112 printf("Inside TRANS_AGREE\n");
117 case TRANS_SOFT_ABORT:
118 printf("Inside TRANS_SOFT_ABORT\n");
120 /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
121 if ((i == tdata->thread_id) && (val == 0)) {
122 //Read list of objects missing
123 if(read(sd, &oidcount, sizeof(int)) != 0) {
124 //Break if only objs are locked at the Participant side
128 transsoftabortmiss++;
129 N = oidcount * sizeof(unsigned int);
130 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
131 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
133 ptr = (char *) oidnotfound;
135 n = read(sd, ptr+sum, N-sum);
137 } while(sum < N && n !=0);
142 printf("Participant sent unknown message\n");
146 //Decide what control message to send to Participant
147 if(transagree == tdata->pilecount){
150 printf("Sending TRANS_COMMIT accept_fd = %d\n", sd);
151 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
152 perror("Error sending ctrl message for participant\n");
155 //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
158 if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
159 //Send abort but retry commit
160 ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
161 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT acceptfd = %d\n", sd);
162 if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
163 perror("Error sending ctrl message for participant\n");
166 //Sleep and the resend the request
168 //Read new control message from Participant
170 if((n = read(sd, &control, sizeof(char))) <= 0) {
171 perror("No bytes are read for participant\n");
175 //Update common data structure and increment count
176 tdata->recvmsg[tdata->thread_id].rcv_status = control;
178 decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
181 if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
182 //Send abort but retry commit after relooking up objects
184 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
185 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
186 perror("Error sending ctrl message for participant\n");
191 //update location table
200 void *transRequest(void *threadarg) {
202 struct sockaddr_in serv_addr;
203 struct hostent *server;
204 thread_data_array_t *tdata;
205 objheader_t *headeraddr;
206 //unsigned int *oidnotfound;
207 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
210 tdata = (thread_data_array_t *) threadarg;
212 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
213 perror("Error in socket for TRANS_REQUEST");
216 bzero((char*) &serv_addr, sizeof(serv_addr));
217 serv_addr.sin_family = AF_INET;
218 serv_addr.sin_port = htons(LISTEN_PORT);
219 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
220 midtoIP(tdata->mid,machineip);
221 machineip[15] = '\0';
222 serv_addr.sin_addr.s_addr = inet_addr(machineip);
224 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
225 perror("Error in connect for TRANS_REQUEST");
229 //Multiple writes for sending packets of data
230 //Send first few fixed bytes of the TRANS_REQUEST protocol
231 printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
232 // printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
233 // printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
234 if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
235 perror("Error sending fixed bytes for thread");
238 //Send list of machines involved in the transaction
239 // printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
240 if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
241 perror("Error sending list of machines for thread");
244 //Send oids and version number tuples for objects that are read
245 // printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
246 // printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
247 if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
248 perror("Error sending tuples for thread");
251 //Send objects that are modified
252 for(i = 0; i < tdata->buffer->f.nummod ; i++) {
253 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
254 if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
255 perror("Error sending obj modified for thread");
260 //Read message control message from participant side
261 if((n = read(sd, &control, sizeof(char))) <= 0) {
262 perror("Error in reading control message from Participant\n");
265 recvcontrol = control;
267 //Update common data structure and increment count
268 tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
269 //Lock and update count
270 //Thread sleeps until all messages from pariticipants are received by coordinator
271 pthread_mutex_lock(tdata->lock);
274 if(*(tdata->count) == tdata->pilecount) {
275 pthread_cond_broadcast(tdata->threshold);
277 pthread_cond_wait(tdata->threshold, tdata->lock);
280 //process the participant's request
281 if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
282 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
283 pthread_mutex_unlock(tdata->lock);
286 pthread_mutex_unlock(tdata->lock);
292 int transCommit(transrecord_t *record) {
293 chashlistnode_t *curr, *ptr, *next;
294 unsigned int size;//Represents number of bins in the chash table
295 unsigned int machinenum, tot_bytes_mod, *listmid;
296 objheader_t *headeraddr;
297 plistnode_t *tmp, *pile = NULL;
299 int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
300 char buffer[RECEIVE_BUFFER_SIZE],control;
301 char transid[TID_LEN];
302 static int newtid = 0;
303 trans_req_data_t *tosend;
305 ptr = record->lookupTable->table;
306 size = record->lookupTable->size;
307 //Look through all the objects in the cache and make piles
308 for(i = 0; i < size ;i++) {
310 //Inner loop to traverse the linked list of the cache lookupTable
311 while(curr != NULL) {
312 //if the first bin in hash table is empty
317 //Get machine location for object id
319 if ((machinenum = lhashSearch(curr->key)) == 0) {
320 printf("Error: No such machine\n");
324 //TODO only for debug
326 if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
327 printf("Error: No such oid\n");
330 //Make machine groups
331 if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
332 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
339 //Create the packet to be sent in TRANS_REQUEST
341 pilecount = pCount(pile); //Keeps track of the number of participants
343 //Thread related variables
344 pthread_t thread[pilecount]; //Create threads for each participant
346 pthread_cond_t tcond;
347 pthread_mutex_t tlock;
348 pthread_mutex_t tlshrd;
349 //thread_data_array_t thread_data_array[pilecount];
350 thread_data_array_t *thread_data_array;
352 thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
354 thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
356 //Initialize and set thread detach attribute
357 pthread_attr_init(&attr);
358 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
359 pthread_mutex_init(&tlock, NULL);
360 pthread_cond_init(&tcond, NULL);
362 //Keep track of list of machine ids per transaction
363 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
364 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
368 pListMid(pile, listmid);
369 //Process each machine group
371 //Create transaction id
373 //trans_req_data_t *tosend;
374 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
375 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
378 tosend->f.control = TRANS_REQUEST;
379 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
380 tosend->f.mcount = pilecount;
381 tosend->f.numread = tmp->numread;
382 tosend->f.nummod = tmp->nummod;
383 tosend->f.sum_bytes = tmp->sum_bytes;
384 tosend->listmid = listmid;
385 tosend->objread = tmp->objread;
386 tosend->oidmod = tmp->oidmod;
387 thread_data_array[numthreads].thread_id = numthreads;
388 thread_data_array[numthreads].mid = tmp->mid;
389 thread_data_array[numthreads].pilecount = pilecount;
390 thread_data_array[numthreads].buffer = tosend;
391 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
392 thread_data_array[numthreads].threshold = &tcond;
393 thread_data_array[numthreads].lock = &tlock;
394 thread_data_array[numthreads].count = &trecvcount;
395 thread_data_array[numthreads].rec = record;
397 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
399 perror("Error in pthread create");
409 // Free attribute and wait for the other threads
410 pthread_attr_destroy(&attr);
411 for (i = 0 ;i < pilecount ; i++) {
412 rc = pthread_join(thread[i], NULL);
415 printf("ERROR return code from pthread_join() is %d\n", rc);
421 pthread_cond_destroy(&tcond);
422 pthread_mutex_destroy(&tlock);
423 // for(i = 0 ;i< pilecount ;i++) {
431 //mnun will be used to represent the machine IP address later
432 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
434 struct sockaddr_in serv_addr;
435 struct hostent *server;
441 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
442 perror("Error in socket\n");
445 bzero((char*) &serv_addr, sizeof(serv_addr));
446 serv_addr.sin_family = AF_INET;
447 serv_addr.sin_port = htons(LISTEN_PORT);
448 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
449 midtoIP(mnum,machineip);
450 machineip[15] = '\0';
451 serv_addr.sin_addr.s_addr = inet_addr(machineip);
453 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
454 perror("Error in connect\n");
457 char readrequest[sizeof(char)+sizeof(unsigned int)];
458 readrequest[0] = READ_REQUEST;
459 *((unsigned int *)(&readrequest[1])) = oid;
460 if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
461 perror("Error sending message\n");
466 printf("DEBUG -> ready to rcv ...\n");
468 //Read response from the Participant
469 if((val = read(sd, &control, sizeof(char))) <= 0) {
470 perror("No control response for getRemoteObj sent\n");
474 case OBJECT_NOT_FOUND:
475 printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
478 if((val = read(sd, &size, sizeof(int))) <= 0) {
479 perror("No size is read from the participant\n");
482 objcopy = objstrAlloc(record->cache, size);
483 if((val = read(sd, objcopy, size)) <= 0) {
484 perror("No objects are read from the remote participant\n");
487 //Insert into cache's lookup table
488 chashInsert(record->lookupTable, oid, objcopy);
491 printf("Error in recv request from participant on a READ_REQUEST\n");