11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
15 extern int classsize[];
17 objstr_t *mainobjstore;
21 //Initialize main object store
22 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
23 if (mhashCreate(HASH_SIZE, LOADFACTOR))
26 if (lhashCreate(HASH_SIZE, LOADFACTOR))
34 int listenfd, acceptfd;
35 struct sockaddr_in my_addr;
36 struct sockaddr_in client_addr;
37 socklen_t addrlength = sizeof(struct sockaddr);
38 pthread_t thread_dstm_accept;
42 listenfd = socket(AF_INET, SOCK_STREAM, 0);
49 if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
54 if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
60 my_addr.sin_family = AF_INET;
61 my_addr.sin_port = htons(LISTEN_PORT);
62 my_addr.sin_addr.s_addr = INADDR_ANY;
63 memset(&(my_addr.sin_zero), '\0', 8);
65 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
71 if (listen(listenfd, BACKLOG) == -1)
77 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
80 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
81 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
86 void *dstmAccept(void *acceptfd)
88 int numbytes,i, val, retval;
90 char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
94 trans_commit_data_t transinfo;
96 int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
98 printf("Recieved connection: fd = %d\n", (int)acceptfd);
99 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
100 perror("Error in receiving control from coordinator\n");
105 if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
106 perror("Error receiving object from cooridnator\n");
109 printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
110 srcObj = mhashSearch(oid);
111 h = (objheader_t *) srcObj;
112 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
114 ctrl = OBJECT_NOT_FOUND;
115 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
116 perror("Error sending control msg to coordinator\n");
120 char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
121 *((int *)&msg[1])=size;
122 if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
123 perror("Error sending size of object to coordinator\n");
125 if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
126 perror("Error in sending object\n");
131 case READ_MULT_REQUEST:
132 printf("DEBUG-> READ_MULT_REQUEST\n");
136 printf("DEBUG -> MOVE_REQUEST\n");
139 case MOVE_MULT_REQUEST:
140 printf("DEBUG -> MOVE_MULT_REQUEST\n");
144 printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
145 if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
146 printf("Error in readClientReq\n");
151 printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
153 if (close((int)acceptfd) == -1)
156 printf("Closed connection: fd = %d\n", (int)acceptfd);
159 printf("DEBUG -> Exiting dstmAccept\n");
162 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
163 char *ptr, control, prevctrl, sendctrl, newctrl;
164 void *modptr, *header;
165 objheader_t *tmp_header;
167 int sum = 0, i, N, n, val, retval;
169 //Reads to process the TRANS_REQUEST protocol further
171 N = sizeof(fixed) - 1;
172 ptr = (char *)&fixed;;
173 fixed.control = TRANS_REQUEST;
175 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
177 } while(sum < N && n != 0);
180 int mcount = fixed.mcount;
181 N = mcount * sizeof(unsigned int);
182 unsigned int listmid[mcount];
183 ptr = (char *) listmid;
186 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
188 } while(sum < N && n != 0);
190 // Read oid and version tuples
191 int numread = fixed.numread;
192 N = numread * (sizeof(unsigned int) + sizeof(short));
194 if(numread != 0) { // If pile contains objects to be read
197 n = recv((int)acceptfd, (void *) objread, N, 0);
199 } while(sum < N && n != 0);
202 // Read modified objects
203 if(fixed.nummod != 0) { // If pile contains modified objects
204 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
205 printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
209 do { // Recv the objs that are modified at Coordinator
210 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
212 } while (sum < fixed.sum_bytes && n != 0);
215 //Send control message as per all votes from all oids in the machine
216 if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
217 printf("Handle req error\n");
220 //Read for new control message from Coordiator
221 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
222 perror("Error in receiving control message");
228 printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
229 //send ack to coordinator
230 sendctrl = TRANS_SUCESSFUL;
231 if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
232 perror("Error sending ACK to coordinator\n");
235 //Mark all ref counts as 1 and do garbage collection
237 for(i = 0; i< fixed.nummod; i++) {
238 tmp_header = (objheader_t *)ptr;
239 tmp_header->rcount = 1;
240 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
242 //Unlock objects that was locked in this machine due to this transaction
243 for(i = 0; i< transinfo->numlocked; i++) {
244 header = mhashSearch(transinfo->objlocked[i]);// find the header address
245 ((objheader_t *)header)->status &= ~(LOCK);
250 printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
251 if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
252 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
255 case TRANS_ABORT_BUT_RETRY_COMMIT:
256 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
257 //Process again after waiting for sometime and on prev control message sent
261 sendctrl = TRANS_AGREE;
262 if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
263 perror("Error sending ACK to coordinator\n");
266 case TRANS_SOFT_ABORT:
267 if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
268 printf("Handle req error\n");
270 //If no change in previous control message that was sent then ABORT transaction
271 if(newctrl == TRANS_SOFT_ABORT){
273 newctrl = TRANS_DISAGREE;
274 if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
275 perror("Error sending ACK to coordinator\n");
277 //Set the reference count of the object to 1 in mainstore for garbage collection
279 for(i = 0; i< fixed.nummod; i++) {
280 tmp_header = (objheader_t *) ptr;
281 tmp_header->rcount = 1;
282 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
284 //Unlock objects that was locked in this machine due to this transaction
285 for(i = 0; i< transinfo->numlocked; i++) {
286 ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
287 ((objheader_t *)ptr)->status &= ~(LOCK);
289 } else if(newctrl == TRANS_AGREE) {
290 newctrl = TRANS_AGREE;
291 //Send new control message
292 if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
293 perror("Error sending ACK to coordinator\n");
301 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
302 //TODO expect another transrequest from client
303 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
306 printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
307 //TODO Use fixed.trans_id TID since Client may have died
311 printf("DEBUG -> Freeing...");
313 if (transinfo->objmod != NULL) {
314 free(transinfo->objmod);
315 transinfo->objmod = NULL;
317 if (transinfo->objlocked != NULL) {
318 free(transinfo->objlocked);
319 transinfo->objlocked = NULL;
321 if (transinfo->objnotfound != NULL) {
322 free(transinfo->objnotfound);
323 transinfo->objnotfound = NULL;
328 //This function runs a decision after all objects are weighed under one of the 4 possibilities
329 //and returns the appropriate control message to the Ccordinator
330 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
333 char control = 0, ctrlmissoid, *ptr;
336 unsigned int *oidnotfound, *oidlocked, *oidmod;
338 oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
339 oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
340 oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
341 // Counters and arrays to formulate decision on control message to be sent
342 int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
343 int objmodnotfound = 0, nummodfound = 0;
345 objheader_t *headptr;
347 //Process each object present in the pile
349 //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
351 //Process each oid in the machine pile/ group
352 for (i = 0; i < fixed->numread + fixed->nummod; i++) {
353 if (i < fixed->numread) {//Object is read
354 int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
356 oid = *((unsigned int *)(objread + incr));
357 incr += sizeof(unsigned int);
358 version = *((short *)(objread + incr));
359 } else {//Obj is modified
360 headptr = (objheader_t *) ptr;
362 oidmod[objmod] = oid;//Array containing modified oids
364 version = headptr->version;
365 ptr += sizeof(objheader_t) + classsize[headptr->type];
367 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
368 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
369 //Save the oids not found for later use
370 oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
372 } else { // If obj found in machine (i.e. has not moved)
373 //Check if obj is locked
374 if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
375 if (version == ((objheader_t *)mobj)->version) { // If version match
377 } else {//If versions don't match ..HARD ABORT
379 //send TRANS_DISAGREE to Coordinator
380 control = TRANS_DISAGREE;
381 if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
382 perror("Error in sending control to the Coordinator\n");
385 printf("DEBUG -> Sending TRANS_DISAGREE\n");
388 } else {//Obj is not locked , so lock object
389 ((objheader_t *)mobj)->status |= LOCK;
392 //Save all object oids that are locked on this machine during this transaction request call
393 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
394 printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
396 if (version == ((objheader_t *)mobj)->version) { //If versions match
398 } else { //If versions don't match
400 //send TRANS_DISAGREE to Coordinator
401 control = TRANS_DISAGREE;
402 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
403 perror("Error in sending control to the Coordinator\n");
406 printf("DEBUG -> Sending TRANS_DISAGREE\n");
413 printf("No of objs locked = %d\n", objlocked);
414 printf("No of v_nomatch = %d\n", v_nomatch);
415 printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
416 printf("No of objs v_match but had locks before = %d\n", v_matchlock);
417 printf("No of objs not found = %d\n", objnotfound);
418 printf("No of objs modified but not found = %d\n", objmodnotfound);
420 //Decide what control message(s) to send
421 //Cond to send TRANS_AGREE
422 if(v_matchnolock == fixed->numread + fixed->nummod) {
423 //send TRANS_AGREE to Coordinator
424 control = TRANS_AGREE;
425 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
426 perror("Error in sending control to Coordinator\n");
429 printf("DEBUG -> Sending TRANS_AGREE\n");
431 //Condition to send TRANS_SOFT_ABORT
432 if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
433 //send TRANS_SOFT_ABORT to Coordinator
434 control = TRANS_SOFT_ABORT;
435 char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
436 *((int*)&msg[1])=objnotfound;
438 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
439 //send number of oids not found and the missing oids
440 if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
441 perror("Error in sending no of objects that are not found\n");
444 if(objnotfound != 0) {
445 int size=sizeof(unsigned int)*objnotfound;
446 if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
447 perror("Error in sending objects that are not found\n");
453 //Do the following when TRANS_DISAGREE is sent
454 if(control == TRANS_DISAGREE) {
455 //Set the reference count of the object to 1 in mainstore for garbage collection
457 for(i = 0; i< fixed->nummod; i++) {
458 headptr = (objheader_t *) ptr;
460 ptr += sizeof(objheader_t) + classsize[headptr->type];
462 //Unlock objects that was locked in the trans
463 for(i = 0; i< objlocked ; i++) {
464 mobj = mhashSearch(oidlocked[i]);// find the header address
465 ((objheader_t *)mobj)->status &= ~(LOCK);
469 //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
470 transinfo->objmod = oidmod;
471 transinfo->objlocked = oidlocked;
472 transinfo->objnotfound = oidnotfound;
473 transinfo->modptr = modptr;
474 transinfo->nummod = fixed->nummod;
475 transinfo->numlocked = objlocked;
476 transinfo->numnotfound = objnotfound;
481 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
482 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
484 int i = 0, offset = 0;
486 //Process each modified object saved in the mainobject store
487 for(i=0; i<transinfo->nummod; i++) {
488 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
489 printf("mhashserach returns NULL\n");
491 //change reference count of older address and free space in objstr ??
492 header->rcount = 1; //Not sure what would be th val
493 //change ptr address in mhash table
494 printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
495 mhashRemove(transinfo->objmod[i]);
496 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
497 offset += sizeof(objheader_t) + classsize[header->type];
498 //update object version
499 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
500 header->version += 1;
502 for(i=0; i<transinfo->numlocked; i++) {
504 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
505 header->status &= ~(LOCK);
508 //TODO Update location lookup table
510 //send ack to coordinator
511 control = TRANS_SUCESSFUL;
513 printf("DEBUG-> Transaction is SUCCESSFUL \n");
514 if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
515 perror("Error sending ACK to coordinator\n");