Added new testcases
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //Initialize main object store
22         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
23         if (mhashCreate(HASH_SIZE, LOADFACTOR))
24                 return 1; //failure
25         
26         if (lhashCreate(HASH_SIZE, LOADFACTOR))
27                 return 1; //failure
28         
29         //pthread_t threadListen;
30         //pthread_create(&threadListen, NULL, dstmListen, NULL);
31         
32         return 0;
33 }
34
35 void *dstmListen()
36 {
37         int listenfd, acceptfd;
38         struct sockaddr_in my_addr;
39         struct sockaddr_in client_addr;
40         socklen_t addrlength = sizeof(struct sockaddr);
41         pthread_t thread_dstm_accept;
42         int i;
43
44         listenfd = socket(AF_INET, SOCK_STREAM, 0);
45         if (listenfd == -1)
46         {
47                 perror("socket");
48                 exit(1);
49         }
50
51         my_addr.sin_family = AF_INET;
52         my_addr.sin_port = htons(LISTEN_PORT);
53         my_addr.sin_addr.s_addr = INADDR_ANY;
54         memset(&(my_addr.sin_zero), '\0', 8);
55
56         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
57         {
58                 perror("bind");
59                 exit(1);
60         }
61         
62         if (listen(listenfd, BACKLOG) == -1)
63         {
64                 perror("listen");
65                 exit(1);
66         }
67
68         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
69         while(1)
70         {
71                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
72                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
73         }
74         pthread_exit(NULL);
75 }
76
77 void *dstmAccept(void *acceptfd)
78 {
79         int numbytes,i, val, retval;
80         unsigned int oid;
81         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
82         char *ptr;
83         void *srcObj;
84         objheader_t *h;
85         trans_commit_data_t transinfo;
86         
87         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
88
89         printf("Recieved connection: fd = %d\n", (int)acceptfd);
90         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
91                 perror("Error in receiving control from coordinator\n");
92                 return;
93         }
94         switch(control) {
95                 case READ_REQUEST:
96                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
97                                 perror("Error receiving object from cooridnator\n");
98                                 return;
99                         }
100                         printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
101                         srcObj = mhashSearch(oid);
102                         h = (objheader_t *) srcObj;
103                         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
104                         if (h == NULL) {
105                                 ctrl = OBJECT_NOT_FOUND;
106                                 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
107                                         perror("Error sending control msg to coordinator\n");
108                                 }
109                         } else {
110                                 //char responsemessage[sizeof(char)+sizeof(int)];
111                                 /* Type */
112                                 ctrl = OBJECT_FOUND;
113                                 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
114                                         perror("Error sending control msg to coordinator\n");
115                                 }
116
117                                 //responsemessage[0]=OBJECT_FOUND;
118                                 /* Size of object */
119                                 //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
120                                 //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
121                                 //      perror("Error sending control msg to coordinator\n");
122                                 //}
123
124                                 /* Size of object */
125                                 if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
126                                         perror("Error sending size of object to coordinator\n");
127                                 }
128                                 if(send((int)acceptfd, h, size, 0) < 0) {
129                                         perror("Error in sending object\n");
130                                 }
131                         }
132                         break;
133                 
134                 case READ_MULT_REQUEST:
135                         printf("DEBUG-> READ_MULT_REQUEST\n");
136                         break;
137         
138                 case MOVE_REQUEST:
139                         printf("DEBUG -> MOVE_REQUEST\n");
140                         break;
141
142                 case MOVE_MULT_REQUEST:
143                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
144                         break;
145
146                 case TRANS_REQUEST:
147                         printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd);
148                         if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
149                                 printf("Error in readClientReq\n");
150                         }
151                         break;
152
153                 default:
154                         printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
155         }
156         if (close((int)acceptfd) == -1)
157                 perror("close");
158         else 
159                 printf("Closed connection: fd = %d\n", (int)acceptfd);
160         
161         pthread_exit(NULL);
162         printf("DEBUG -> Exiting dstmAccept\n");
163 }
164
165 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
166         char *ptr, control, prevctrl, sendctrl, newctrl;
167         void *modptr, *header;
168         objheader_t *tmp_header;
169         fixed_data_t fixed;
170         int sum = 0, i, N, n, val, retval;
171
172         //Reads to process the TRANS_REQUEST protocol further
173         // Read fixed_data
174         N = sizeof(fixed) - 1;
175         ptr = (char *)&fixed;;
176         fixed.control = TRANS_REQUEST;
177         do {
178                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
179         //      printf("DEBUG -> 1. Reading %d bytes \n", n);
180                 sum += n;
181         } while(sum < N && n != 0); 
182
183         //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
184         // Read list of mids
185         int mcount = fixed.mcount;
186         N = mcount * sizeof(unsigned int);
187         unsigned int listmid[mcount];
188         ptr = (char *) listmid;
189         sum = 0;
190         do {
191                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
192         //      printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
193                 sum += n;
194         } while(sum < N && n != 0);
195
196         // Read oid and version tuples
197         int numread = fixed.numread;
198         N = numread * (sizeof(unsigned int) + sizeof(short));
199         char objread[N];
200         if(numread != 0) { // If pile contains objects to be read 
201         //      N = numread * (sizeof(unsigned int) + sizeof(short));
202         //      char objread[N];
203                 sum = 0;
204                 do {
205                         n = recv((int)acceptfd, (void *) objread, N, 0);
206                 //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
207                         sum += n;
208                 } while(sum < N && n != 0);
209 //              printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
210         }
211         
212         // Read modified objects
213         if(fixed.nummod != 0) { // If pile contains modified objects 
214                 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
215                         printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
216                         return 1;
217                 }
218                 sum = 0;
219                 do { // Recv the objs that are modified at Coordinator
220                         n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
221                 //      printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
222                         sum += n;
223                 } while (sum < fixed.sum_bytes && n != 0);
224         }
225
226         //Send control message as per all votes from all oids in the machine
227         if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) {
228                 printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__);
229         }
230
231         //Read for new control message from Coordiator
232         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
233                 printf("DEBUG -> Error receiving control, received %d\n", control);
234                 return 1;
235         }
236
237         printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control);
238         fflush(stdout);
239
240         switch(control) {
241                 case TRANS_ABORT:
242                         printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ;
243                         //send ack to coordinator
244                         sendctrl = TRANS_SUCESSFUL;
245                         if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
246                                 perror("Error sending ACK to coordinator\n");
247                                 return 1;
248                         }
249                         //Mark all ref counts as 1 and do garbage collection
250                         ptr = modptr;
251                         for(i = 0; i< fixed.nummod; i++) {
252                                 tmp_header = (objheader_t *)ptr;
253                                 tmp_header->rcount = 1;
254                                 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
255                         }
256                         //Unlock objects that was locked in this machine due to this transaction
257                         for(i = 0; i< transinfo->numlocked; i++) {
258                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
259                                 ((objheader_t *)header)->status &= ~(LOCK);             
260                         }
261                         ptr = NULL;
262                         return 0;
263                 case TRANS_COMMIT:
264                         printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
265                         if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
266                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
267                         }
268                         break;
269                 case TRANS_ABORT_BUT_RETRY_COMMIT:
270                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd);
271                         //Process again after waiting for sometime and on prev control message sent
272                         sleep(2);
273                         switch(prevctrl) {
274                                 case TRANS_AGREE:
275                                         sendctrl = TRANS_AGREE;
276                                         if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
277                                                 perror("Error sending ACK to coordinator\n");
278                                         }
279                                         //sleep(5);
280                                         break;
281                                 case TRANS_SOFT_ABORT:
282                                         if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
283                                                 printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__);
284                                         }
285                                         //If no change in previous control message that was sent then ABORT transaction
286                                         if(newctrl == TRANS_SOFT_ABORT){
287                                                 //Send ABORT
288                                                 newctrl = TRANS_DISAGREE;
289                                                 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
290                                                         perror("Error sending ACK to coordinator\n");
291                                                 }
292                                                 //Set the reference count of the object to 1 in mainstore for garbage collection
293                                                 ptr = modptr;
294                                                 for(i = 0; i< fixed.nummod; i++) {
295                                                         tmp_header = (objheader_t *) ptr;
296                                                         tmp_header->rcount = 1;
297                                                         ptr += sizeof(objheader_t) + classsize[tmp_header->type];
298                                                 }
299                                                 //Unlock objects that was locked in this machine due to this transaction
300                                                 for(i = 0; i< transinfo->numlocked; i++) {
301                                                         ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
302                                                         ((objheader_t *)ptr)->status &= ~(LOCK);                
303                                                 }
304                                         //      return 0;
305                                         } else if(newctrl == TRANS_AGREE) {
306                                                 newctrl = TRANS_AGREE;
307                                                 //Send new control message
308                                                 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
309                                                         perror("Error sending ACK to coordinator\n");
310                                                 }
311                                         }
312
313                                         break;
314                         }
315
316                         break;
317                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
318                         //TODO expect another transrequest from client
319                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd);
320                         break;
321                 default:
322                         printf("No response to TRANS_AGREE OR DISAGREE control\n");
323                         //TODO Use fixed.trans_id  TID since Client may have died
324                         break;
325         }
326
327         //Free memory
328         printf("DEBUG -> Freeing...");
329         fflush(stdout);
330         if (transinfo->objmod != NULL) {
331                 free(transinfo->objmod);
332                 transinfo->objmod = NULL;
333         }
334         if (transinfo->objlocked != NULL) {
335                 free(transinfo->objlocked);
336                 transinfo->objlocked = NULL;
337         }
338         if (transinfo->objnotfound != NULL) {
339                 free(transinfo->objnotfound);
340                 transinfo->objnotfound = NULL;
341         }
342         return 0;
343 }
344
345 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
346 //and returns the appropriate control message to the Ccordinator 
347 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
348         int val;
349         short version;
350         char control = 0, ctrlmissoid, *ptr;
351         int i, j = 0;
352         unsigned int oid;
353         unsigned int *oidnotfound, *oidlocked, *oidmod;
354
355         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
356         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
357         oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
358         // Counters and arrays to formulate decision on control message to be sent
359         int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
360         int objmodnotfound = 0, nummodfound = 0;
361         void *mobj;
362         objheader_t *headptr;
363         
364         //Process each object present in the pile 
365         ptr = modptr;
366         
367         //Process each oid in the machine pile/ group
368         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
369                 if (i < fixed->numread) {//Object is read
370                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
371                         incr *= i;
372                         oid = *((unsigned int *)(objread + incr));
373                         incr += sizeof(unsigned int);
374                         version = *((short *)(objread + incr));
375                 } else {//Obj is modified
376                         headptr = (objheader_t *) ptr;
377                         oid = headptr->oid;
378                         oidmod[objmod] = oid;//Array containing modified oids
379                         objmod++;
380                         version = headptr->version;
381                         ptr += sizeof(objheader_t) + classsize[headptr->type];
382                 }
383                 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
384                 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
385                         //Save the oids not found for later use
386                         oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
387                         objnotfound++;
388                 } else { // If obj found in machine (i.e. has not moved)
389                         //Check if obj is locked
390                         if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
391                                 if (version == ((objheader_t *)mobj)->version) {      // If version match
392                                         v_matchlock++;
393                                 } else {//If versions don't match ..HARD ABORT
394                                         v_nomatch++;
395                                         //send TRANS_DISAGREE to Coordinator
396                                         control = TRANS_DISAGREE;
397                                         if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
398                                                 perror("Error in sending control to the Coordinator\n");
399                                                 return 0;
400                                         }
401                                         printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd);
402                                         return control;
403                                 }
404                         } else {//Obj is not locked , so lock object
405                                 ((objheader_t *)mobj)->status |= LOCK;
406                                 //Save all object oids that are locked on this machine during this transaction request call
407                                 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
408                                 printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid);
409                                 objlocked++;
410                                 if (version == ((objheader_t *)mobj)->version) { //If versions match
411                                         v_matchnolock++;
412                                 } else { //If versions don't match
413                                         v_nomatch++;
414                                         //send TRANS_DISAGREE to Coordinator
415                                         control = TRANS_DISAGREE;
416                                         if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
417                                                 perror("Error in sending control to the Coordinator\n");
418                                                 return 0;
419                                         }
420                                         printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
421                                         return control;
422                                 }
423                         }
424                 }
425         }
426
427         printf("No of objs locked = %d\n", objlocked);
428         printf("No of v_nomatch = %d\n", v_nomatch);
429         printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
430         printf("No of objs v_match but had locks before = %d\n", v_matchlock);
431         printf("No of objs not found = %d\n", objnotfound);
432         printf("No of objs modified but not found = %d\n", objmodnotfound);
433
434         //Decide what control message(s) to send
435         //Cond to send TRANS_AGREE
436         if(v_matchnolock == fixed->numread + fixed->nummod) {
437                 //send TRANS_AGREE to Coordinator
438                 control = TRANS_AGREE;
439                 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
440                         perror("Error in sending control to Coordinator\n");
441                         return 0;
442                 }
443                 printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd);
444         }
445         //Condition to send TRANS_SOFT_ABORT
446         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
447                 //send TRANS_SOFT_ABORT to Coordinator
448                 control = TRANS_SOFT_ABORT;
449                 if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
450                         perror("Error in sending control back to coordinator\n");
451                         return 0;
452                 }
453                 printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd);
454                 //send number of oids not found and the missing oids 
455                 if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
456                         perror("Error in sending no of objects that are not found\n");
457                         return 0;
458                 }
459                 if(objnotfound != 0) { 
460                         if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
461                                 perror("Error in sending objects that are not found\n");
462                                 return 0;
463                         }
464                 }
465         }
466         
467         //Do the following when TRANS_DISAGREE is sent
468         if(control == TRANS_DISAGREE) {
469                 //Set the reference count of the object to 1 in mainstore for garbage collection
470                 ptr = modptr;
471                 for(i = 0; i< fixed->nummod; i++) {
472                         headptr = (objheader_t *) ptr;
473                         headptr->rcount = 1;
474                         ptr += sizeof(objheader_t) + classsize[headptr->type];
475                 }
476                 //Unlock objects that was locked in the trans
477                 for(i = 0; i< objlocked ; i++) {
478                         mobj = mhashSearch(oidlocked[i]);// find the header address
479                         ((objheader_t *)mobj)->status &= ~(LOCK);               
480                 }       
481         }       
482
483         //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
484         transinfo->objmod = oidmod;
485         transinfo->objlocked = oidlocked;
486         transinfo->objnotfound = oidnotfound;
487         transinfo->modptr = modptr;
488         transinfo->nummod = fixed->nummod;
489         transinfo->numlocked = objlocked;
490         transinfo->numnotfound = objnotfound;
491         
492         return control;
493 }
494
495 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
496 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
497         objheader_t *header;
498         int i = 0, offset = 0;
499         char control;
500         //Process each modified object saved in the mainobject store
501         for(i=0; i<transinfo->nummod; i++) {
502                 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
503                         printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__);
504                 }
505                 //change reference count of older address and free space in objstr ??
506                 header->rcount = 1; //Not sure what would be th val
507                 //change ptr address in mhash table
508                 printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]);
509                 mhashRemove(transinfo->objmod[i]);
510                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
511                 offset += sizeof(objheader_t) + classsize[header->type];
512                 //update object version
513                 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
514                 header->version += 1; 
515         }
516         for(i=0; i<transinfo->numlocked; i++) {
517                 //unlock objects
518                 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
519                 header->status &= ~(LOCK);
520         }
521
522         //TODO Update location lookup table
523
524         //send ack to coordinator
525         control = TRANS_SUCESSFUL;
526         //FOR TESTING
527         printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd);
528         if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
529                 perror("Error sending ACK to coordinator\n");
530         }
531         
532         return 0;
533 }
534