11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
15 extern int classsize[];
17 objstr_t *mainobjstore;
21 //Initialize main object store
22 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
23 if (mhashCreate(HASH_SIZE, LOADFACTOR))
26 if (lhashCreate(HASH_SIZE, LOADFACTOR))
29 //pthread_t threadListen;
30 //pthread_create(&threadListen, NULL, dstmListen, NULL);
37 int listenfd, acceptfd;
38 struct sockaddr_in my_addr;
39 struct sockaddr_in client_addr;
40 socklen_t addrlength = sizeof(struct sockaddr);
41 pthread_t thread_dstm_accept;
44 listenfd = socket(AF_INET, SOCK_STREAM, 0);
51 my_addr.sin_family = AF_INET;
52 my_addr.sin_port = htons(LISTEN_PORT);
53 my_addr.sin_addr.s_addr = INADDR_ANY;
54 memset(&(my_addr.sin_zero), '\0', 8);
56 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
62 if (listen(listenfd, BACKLOG) == -1)
68 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
71 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
72 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
77 void *dstmAccept(void *acceptfd)
79 int numbytes,i, val, retval;
81 char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
85 trans_commit_data_t transinfo;
87 int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
89 printf("Recieved connection: fd = %d\n", (int)acceptfd);
90 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
91 perror("Error in receiving control from coordinator\n");
96 printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
97 if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
98 perror("Error receiving object from cooridnator\n");
101 srcObj = mhashSearch(oid);
102 h = (objheader_t *) srcObj;
103 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
105 ctrl = OBJECT_NOT_FOUND;
106 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
107 perror("Error sending control msg to coordinator\n");
110 //char responsemessage[sizeof(char)+sizeof(int)];
113 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
114 perror("Error sending control msg to coordinator\n");
117 //responsemessage[0]=OBJECT_FOUND;
119 //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
120 //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
121 // perror("Error sending control msg to coordinator\n");
125 if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
126 perror("Error sending size of object to coordinator\n");
128 if(send((int)acceptfd, h, size, 0) < 0) {
129 perror("Error in sending object\n");
134 case READ_MULT_REQUEST:
135 printf("DEBUG-> READ_MULT_REQUEST\n");
139 printf("DEBUG -> MOVE_REQUEST\n");
142 case MOVE_MULT_REQUEST:
143 printf("DEBUG -> MOVE_MULT_REQUEST\n");
147 printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
148 if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
149 printf("Error in readClientReq\n");
154 printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
156 if (close((int)acceptfd) == -1)
161 printf("Closed connection: fd = %d\n", (int)acceptfd);
165 printf("DEBUG -> Exiting dstmAccept\n");
168 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
169 char *ptr, control, prevctrl, sendctrl, newctrl;
170 void *modptr, *header;
171 objheader_t *tmp_header;
173 int sum = 0, i, N, n, val, retval;
175 //Reads to process the TRANS_REQUEST protocol further
177 N = sizeof(fixed) - 1;
178 ptr = (char *)&fixed;;
179 fixed.control = TRANS_REQUEST;
181 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
182 // printf("DEBUG -> 1. Reading %d bytes \n", n);
184 } while(sum < N && n != 0);
186 //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
188 int mcount = fixed.mcount;
189 N = mcount * sizeof(unsigned int);
190 unsigned int listmid[mcount];
191 ptr = (char *) listmid;
194 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
195 // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
197 } while(sum < N && n != 0);
199 // Read oid and version tuples
200 int numread = fixed.numread;
201 N = numread * (sizeof(unsigned int) + sizeof(short));
203 if(numread != 0) { // If pile contains objects to be read
204 // N = numread * (sizeof(unsigned int) + sizeof(short));
208 n = recv((int)acceptfd, (void *) objread, N, 0);
209 // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
211 } while(sum < N && n != 0);
212 // printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
215 // Read modified objects
216 if(fixed.nummod != 0) { // If pile contains modified objects
217 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
218 printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
222 do { // Recv the objs that are modified at Coordinator
223 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
224 // printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
226 } while (sum < fixed.sum_bytes && n != 0);
229 //Send control message as per all votes from all oids in the machine
230 if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
231 printf("Handle req error\n");
234 //Read for new control message from Coordiator
235 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
236 perror("Error in receiving control message");
242 printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
243 //send ack to coordinator
244 sendctrl = TRANS_SUCESSFUL;
245 if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
246 perror("Error sending ACK to coordinator\n");
249 //Mark all ref counts as 1 and do garbage collection
251 for(i = 0; i< fixed.nummod; i++) {
252 tmp_header = (objheader_t *)ptr;
253 tmp_header->rcount = 1;
254 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
256 //Unlock objects that was locked in this machine due to this transaction
257 for(i = 0; i< transinfo->numlocked; i++) {
258 header = mhashSearch(transinfo->objlocked[i]);// find the header address
259 ((objheader_t *)header)->status &= ~(LOCK);
265 printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
266 if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
267 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
270 case TRANS_ABORT_BUT_RETRY_COMMIT:
271 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
272 //Process again after waiting for sometime and on prev control message sent
275 sendctrl = TRANS_AGREE;
276 if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
277 perror("Error sending ACK to coordinator\n");
281 case TRANS_SOFT_ABORT:
282 if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
283 printf("Handle req error\n");
285 if(newctrl == prevctrl){
287 newctrl = TRANS_DISAGREE;
288 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
289 perror("Error sending ACK to coordinator\n");
291 //Set the reference count of the object to 1 in mainstore for garbage collection
293 for(i = 0; i< fixed.nummod; i++) {
294 tmp_header = (objheader_t *) ptr;
295 tmp_header->rcount = 1;
296 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
298 //Unlock objects that was locked in this machine due to this transaction
299 for(i = 0; i< transinfo->numlocked; i++) {
300 ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
301 ((objheader_t *)ptr)->status &= ~(LOCK);
305 //Send new control message
306 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
307 perror("Error sending ACK to coordinator\n");
315 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
316 //TODO expect another transrequest from client
317 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
320 printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
321 //TODO Use fixed.trans_id TID since Client may have died
325 printf("DEBUG -> Freeing...");
327 if (transinfo->objmod != NULL) {
328 free(transinfo->objmod);
329 transinfo->objmod = NULL;
331 if (transinfo->objlocked != NULL) {
332 free(transinfo->objlocked);
333 transinfo->objlocked = NULL;
335 if (transinfo->objnotfound != NULL) {
336 free(transinfo->objnotfound);
337 transinfo->objnotfound = NULL;
342 //This function runs a decision after all objects are weighed under one of the 4 possibilities
343 //and returns the appropriate control message to the Ccordinator
344 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
347 char control = 0, ctrlmissoid, *ptr;
350 unsigned int *oidnotfound, *oidlocked, *oidmod;
352 oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
353 oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
354 oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
355 // Counters and arrays to formulate decision on control message to be sent
356 int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
357 int objmodnotfound = 0, nummodfound = 0;
359 objheader_t *headptr;
361 //Process each object present in the pile
363 //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
365 //Process each oid in the machine pile/ group
366 for (i = 0; i < fixed->numread + fixed->nummod; i++) {
367 if (i < fixed->numread) {//Object is read
368 int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
370 oid = *((unsigned int *)(objread + incr));
371 incr += sizeof(unsigned int);
372 version = *((short *)(objread + incr));
373 } else {//Obj is modified
374 headptr = (objheader_t *) ptr;
376 oidmod[objmod] = oid;//Array containing modified oids
378 version = headptr->version;
379 ptr += sizeof(objheader_t) + classsize[headptr->type];
381 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
382 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
383 //Save the oids not found for later use
384 oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
386 } else { // If obj found in machine (i.e. has not moved)
387 //Check if obj is locked
388 if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
389 if (version == ((objheader_t *)mobj)->version) { // If version match
391 } else {//If versions don't match ..HARD ABORT
393 //send TRANS_DISAGREE to Coordinator
394 control = TRANS_DISAGREE;
395 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
396 perror("Error in sending control to the Coordinator\n");
399 printf("DEBUG -> Sending TRANS_DISAGREE\n");
402 } else {//Obj is not locked , so lock object
403 ((objheader_t *)mobj)->status |= LOCK;
404 //Save all object oids that are locked on this machine during this transaction request call
405 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
406 printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
408 if (version == ((objheader_t *)mobj)->version) { //If versions match
410 } else { //If versions don't match
412 //send TRANS_DISAGREE to Coordinator
413 control = TRANS_DISAGREE;
414 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
415 perror("Error in sending control to the Coordinator\n");
418 printf("DEBUG -> Sending TRANS_DISAGREE\n");
425 printf("No of objs locked = %d\n", objlocked);
426 printf("No of v_nomatch = %d\n", v_nomatch);
427 printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
428 printf("No of objs v_match but had locks before = %d\n", v_matchlock);
429 printf("No of objs not found = %d\n", objnotfound);
430 printf("No of objs modified but not found = %d\n", objmodnotfound);
432 //Decide what control message(s) to send
433 if(v_matchnolock == fixed->numread + fixed->nummod) {
434 //send TRANS_AGREE to Coordinator
435 control = TRANS_AGREE;
436 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
437 perror("Error in sending control to Coordinator\n");
440 printf("DEBUG -> Sending TRANS_AGREE\n");
443 if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
444 //send TRANS_SOFT_ABORT to Coordinator
445 control = TRANS_SOFT_ABORT;
446 if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
447 perror("Error in sending control back to coordinator\n");
450 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
451 //send number of oids not found and the missing oids
452 if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
453 perror("Error in sending no of objects that are not found\n");
456 if(objnotfound != 0) {
457 if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
458 perror("Error in sending objects that are not found\n");
464 //Do the following when TRANS_DISAGREE is sent
465 if(control == TRANS_DISAGREE) {
466 //Set the reference count of the object to 1 in mainstore for garbage collection
468 for(i = 0; i< fixed->nummod; i++) {
469 headptr = (objheader_t *) ptr;
471 ptr += sizeof(objheader_t) + classsize[headptr->type];
473 //Unlock objects that was locked in the trans
474 for(i = 0; i< objlocked ; i++) {
475 mobj = mhashSearch(oidlocked[i]);// find the header address
476 ((objheader_t *)mobj)->status &= ~(LOCK);
480 //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
481 transinfo->objmod = oidmod;
482 transinfo->objlocked = oidlocked;
483 transinfo->objnotfound = oidnotfound;
484 transinfo->modptr = modptr;
485 transinfo->nummod = fixed->nummod;
486 transinfo->numlocked = objlocked;
487 transinfo->numnotfound = objnotfound;
492 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
493 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
495 int i = 0, offset = 0;
497 //Process each modified object saved in the mainobject store
498 for(i=0; i<transinfo->nummod; i++) {
499 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
500 printf("mhashserach returns NULL\n");
502 //change reference count of older address and free space in objstr ??
503 header->rcount = 1; //Not sure what would be th val
504 //change ptr address in mhash table
505 mhashRemove(transinfo->objmod[i]);
506 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
507 offset += sizeof(objheader_t) + classsize[header->type];
508 //update object version
509 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
510 header->version += 1;
512 for(i=0; i<transinfo->numlocked; i++) {
514 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
515 header->status &= ~(LOCK);
518 //TODO Update location lookup table
520 //send ack to coordinator
521 control = TRANS_SUCESSFUL;
522 if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
523 perror("Error sending ACK to coordinator\n");
526 printf("DEBUG-> Completed the pending transaction\n");