my changes:
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //Initialize main object store
22         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
23         if (mhashCreate(HASH_SIZE, LOADFACTOR))
24                 return 1; //failure
25         
26         if (lhashCreate(HASH_SIZE, LOADFACTOR))
27                 return 1; //failure
28         
29         return 0;
30 }
31
32 void *dstmListen()
33 {
34         int listenfd, acceptfd;
35         struct sockaddr_in my_addr;
36         struct sockaddr_in client_addr;
37         socklen_t addrlength = sizeof(struct sockaddr);
38         pthread_t thread_dstm_accept;
39         int i;
40         int setsockflag=1;
41
42         listenfd = socket(AF_INET, SOCK_STREAM, 0);
43         if (listenfd == -1)
44         {
45                 perror("socket");
46                 exit(1);
47         }
48
49         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
50           perror("socket");
51           exit(1);
52         }
53 #ifdef MAC
54         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
55           perror("socket");
56           exit(1);
57         }
58 #endif
59
60         my_addr.sin_family = AF_INET;
61         my_addr.sin_port = htons(LISTEN_PORT);
62         my_addr.sin_addr.s_addr = INADDR_ANY;
63         memset(&(my_addr.sin_zero), '\0', 8);
64
65         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
66         {
67                 perror("bind");
68                 exit(1);
69         }
70         
71         if (listen(listenfd, BACKLOG) == -1)
72         {
73                 perror("listen");
74                 exit(1);
75         }
76
77         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
78         while(1)
79         {
80                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
81                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
82         }
83         pthread_exit(NULL);
84 }
85
86 void *dstmAccept(void *acceptfd)
87 {
88         int numbytes,i, val, retval;
89         unsigned int oid;
90         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
91         char *ptr;
92         void *srcObj;
93         objheader_t *h;
94         trans_commit_data_t transinfo;
95         
96         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
97
98         printf("Recieved connection: fd = %d\n", (int)acceptfd);
99         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
100                 perror("Error in receiving control from coordinator\n");
101                 return;
102         }
103         switch(control) {
104                 case READ_REQUEST:
105                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
106                                 perror("Error receiving object from cooridnator\n");
107                                 return;
108                         }
109                         printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
110                         srcObj = mhashSearch(oid);
111                         h = (objheader_t *) srcObj;
112                         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
113                         if (h == NULL) {
114                                 ctrl = OBJECT_NOT_FOUND;
115                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
116                                         perror("Error sending control msg to coordinator\n");
117                                 }
118                         } else {
119                                 /* Type */
120                           char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
121                           *((int *)&msg[1])=size;
122                           if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
123                             perror("Error sending size of object to coordinator\n");
124                           }
125                           if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
126                             perror("Error in sending object\n");
127                           }
128                         }
129                         break;
130                 
131                 case READ_MULT_REQUEST:
132                         printf("DEBUG-> READ_MULT_REQUEST\n");
133                         break;
134         
135                 case MOVE_REQUEST:
136                         printf("DEBUG -> MOVE_REQUEST\n");
137                         break;
138
139                 case MOVE_MULT_REQUEST:
140                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
141                         break;
142
143                 case TRANS_REQUEST:
144                         printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
145                         if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
146                                 printf("Error in readClientReq\n");
147                         }
148                         break;
149
150                 default:
151                         printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
152         }
153         if (close((int)acceptfd) == -1)
154                 perror("close");
155         else 
156                 printf("Closed connection: fd = %d\n", (int)acceptfd);
157         
158         pthread_exit(NULL);
159         printf("DEBUG -> Exiting dstmAccept\n");
160 }
161
162 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
163         char *ptr, control, prevctrl, sendctrl, newctrl;
164         void *modptr, *header;
165         objheader_t *tmp_header;
166         fixed_data_t fixed;
167         int sum = 0, i, N, n, val, retval;
168
169         //Reads to process the TRANS_REQUEST protocol further
170         // Read fixed_data
171         N = sizeof(fixed) - 1;
172         ptr = (char *)&fixed;;
173         fixed.control = TRANS_REQUEST;
174         do {
175                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
176                 sum += n;
177         } while(sum < N && n != 0); 
178
179         // Read list of mids
180         int mcount = fixed.mcount;
181         N = mcount * sizeof(unsigned int);
182         unsigned int listmid[mcount];
183         ptr = (char *) listmid;
184         sum = 0;
185         do {
186                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
187                 sum += n;
188         } while(sum < N && n != 0);
189
190         // Read oid and version tuples
191         int numread = fixed.numread;
192         N = numread * (sizeof(unsigned int) + sizeof(short));
193         char objread[N];
194         if(numread != 0) { // If pile contains objects to be read 
195                 sum = 0;
196                 do {
197                         n = recv((int)acceptfd, (void *) objread, N, 0);
198                         sum += n;
199                 } while(sum < N && n != 0);
200         }
201         
202         // Read modified objects
203         if(fixed.nummod != 0) { // If pile contains modified objects 
204                 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
205                         printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
206                         return 1;
207                 }
208                 sum = 0;
209                 do { // Recv the objs that are modified at Coordinator
210                         n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
211                         sum += n;
212                 } while (sum < fixed.sum_bytes && n != 0);
213         }
214
215         //Send control message as per all votes from all oids in the machine
216         if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
217                 printf("Handle req error\n");
218         }
219
220         //Read for new control message from Coordiator
221         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
222                 perror("Error in receiving control message");
223                 return 1;
224         }
225
226         switch(control) {
227                 case TRANS_ABORT:
228                         printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
229                         //send ack to coordinator
230                         sendctrl = TRANS_SUCESSFUL;
231                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
232                                 perror("Error sending ACK to coordinator\n");
233                                 return 1;
234                         }
235                         //Mark all ref counts as 1 and do garbage collection
236                         ptr = modptr;
237                         for(i = 0; i< fixed.nummod; i++) {
238                                 tmp_header = (objheader_t *)ptr;
239                                 tmp_header->rcount = 1;
240                                 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
241                         }
242                         //Unlock objects that was locked in this machine due to this transaction
243                         for(i = 0; i< transinfo->numlocked; i++) {
244                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
245                                 ((objheader_t *)header)->status &= ~(LOCK);             
246                         }
247                         ptr = NULL;
248                         return 0;
249                 case TRANS_COMMIT:
250                         printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
251                         if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
252                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
253                         }
254                         break;
255                 case TRANS_ABORT_BUT_RETRY_COMMIT:
256                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
257                         //Process again after waiting for sometime and on prev control message sent
258                         sleep(2);
259                         switch(prevctrl) {
260                                 case TRANS_AGREE:
261                                         sendctrl = TRANS_AGREE;
262                                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
263                                                 perror("Error sending ACK to coordinator\n");
264                                         }
265                                         break;
266                                 case TRANS_SOFT_ABORT:
267                                         if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
268                                                 printf("Handle req error\n");
269                                         }
270                                         //If no change in previous control message that was sent then ABORT transaction
271                                         if(newctrl == TRANS_SOFT_ABORT){
272                                                 //Send ABORT
273                                                 newctrl = TRANS_DISAGREE;
274                                                 if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
275                                                         perror("Error sending ACK to coordinator\n");
276                                                 }
277                                                 //Set the reference count of the object to 1 in mainstore for garbage collection
278                                                 ptr = modptr;
279                                                 for(i = 0; i< fixed.nummod; i++) {
280                                                         tmp_header = (objheader_t *) ptr;
281                                                         tmp_header->rcount = 1;
282                                                         ptr += sizeof(objheader_t) + classsize[tmp_header->type];
283                                                 }
284                                                 //Unlock objects that was locked in this machine due to this transaction
285                                                 for(i = 0; i< transinfo->numlocked; i++) {
286                                                         ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
287                                                         ((objheader_t *)ptr)->status &= ~(LOCK);                
288                                                 }
289                                         } else if(newctrl == TRANS_AGREE) {
290                                                 newctrl = TRANS_AGREE;
291                                                 //Send new control message
292                                                 if(send((int)acceptfd, &newctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
293                                                         perror("Error sending ACK to coordinator\n");
294                                                 }
295                                         }
296
297                                         break;
298                         }
299
300                         break;
301                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
302                         //TODO expect another transrequest from client
303                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
304                         break;
305                 default:
306                         printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
307                         //TODO Use fixed.trans_id  TID since Client may have died
308                         break;
309         }
310         //Free memory
311         printf("DEBUG -> Freeing...");
312         fflush(stdout);
313         if (transinfo->objmod != NULL) {
314                 free(transinfo->objmod);
315                 transinfo->objmod = NULL;
316         }
317         if (transinfo->objlocked != NULL) {
318                 free(transinfo->objlocked);
319                 transinfo->objlocked = NULL;
320         }
321         if (transinfo->objnotfound != NULL) {
322                 free(transinfo->objnotfound);
323                 transinfo->objnotfound = NULL;
324         }
325         return 0;
326 }
327
328 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
329 //and returns the appropriate control message to the Ccordinator 
330 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
331         int val;
332         short version;
333         char control = 0, ctrlmissoid, *ptr;
334         int i, j = 0;
335         unsigned int oid;
336         unsigned int *oidnotfound, *oidlocked, *oidmod;
337
338         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
339         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
340         oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
341         // Counters and arrays to formulate decision on control message to be sent
342         int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
343         int objmodnotfound = 0, nummodfound = 0;
344         void *mobj;
345         objheader_t *headptr;
346         
347         //Process each object present in the pile 
348         ptr = modptr;
349         //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
350         fflush(stdout);
351         //Process each oid in the machine pile/ group
352         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
353                 if (i < fixed->numread) {//Object is read
354                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
355                         incr *= i;
356                         oid = *((unsigned int *)(objread + incr));
357                         incr += sizeof(unsigned int);
358                         version = *((short *)(objread + incr));
359                 } else {//Obj is modified
360                         headptr = (objheader_t *) ptr;
361                         oid = headptr->oid;
362                         oidmod[objmod] = oid;//Array containing modified oids
363                         objmod++;
364                         version = headptr->version;
365                         ptr += sizeof(objheader_t) + classsize[headptr->type];
366                 }
367                 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
368                 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
369                         //Save the oids not found for later use
370                         oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
371                         objnotfound++;
372                 } else { // If obj found in machine (i.e. has not moved)
373                         //Check if obj is locked
374                         if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
375                                 if (version == ((objheader_t *)mobj)->version) {      // If version match
376                                         v_matchlock++;
377                                 } else {//If versions don't match ..HARD ABORT
378                                         v_nomatch++;
379                                         //send TRANS_DISAGREE to Coordinator
380                                         control = TRANS_DISAGREE;
381                                         if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
382                                                 perror("Error in sending control to the Coordinator\n");
383                                                 return 0;
384                                         }
385                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
386                                         return control;
387                                 }
388                         } else {//Obj is not locked , so lock object
389                                 ((objheader_t *)mobj)->status |= LOCK;
390                                 //FOR TESTING
391                                 sleep(1);
392                                 //Save all object oids that are locked on this machine during this transaction request call
393                                 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
394                                 printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
395                                 objlocked++;
396                                 if (version == ((objheader_t *)mobj)->version) { //If versions match
397                                         v_matchnolock++;
398                                 } else { //If versions don't match
399                                         v_nomatch++;
400                                         //send TRANS_DISAGREE to Coordinator
401                                         control = TRANS_DISAGREE;
402                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
403                                                 perror("Error in sending control to the Coordinator\n");
404                                                 return 0;
405                                         }
406                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
407                                         return control;
408                                 }
409                         }
410                 }
411         }
412
413         printf("No of objs locked = %d\n", objlocked);
414         printf("No of v_nomatch = %d\n", v_nomatch);
415         printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
416         printf("No of objs v_match but had locks before = %d\n", v_matchlock);
417         printf("No of objs not found = %d\n", objnotfound);
418         printf("No of objs modified but not found = %d\n", objmodnotfound);
419
420         //Decide what control message(s) to send
421         //Cond to send TRANS_AGREE
422         if(v_matchnolock == fixed->numread + fixed->nummod) {
423                 //send TRANS_AGREE to Coordinator
424                 control = TRANS_AGREE;
425                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
426                         perror("Error in sending control to Coordinator\n");
427                         return 0;
428                 }
429                 printf("DEBUG -> Sending TRANS_AGREE\n");
430         }
431         //Condition to send TRANS_SOFT_ABORT
432         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
433                 //send TRANS_SOFT_ABORT to Coordinator
434                 control = TRANS_SOFT_ABORT;
435                 char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
436                 *((int*)&msg[1])=objnotfound;
437
438                 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
439                 //send number of oids not found and the missing oids 
440                 if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
441                         perror("Error in sending no of objects that are not found\n");
442                         return 0;
443                 }
444                 if(objnotfound != 0) { 
445                   int size=sizeof(unsigned int)*objnotfound;
446                   if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
447                                 perror("Error in sending objects that are not found\n");
448                                 return 0;
449                         }
450                 }
451         }
452         
453         //Do the following when TRANS_DISAGREE is sent
454         if(control == TRANS_DISAGREE) {
455                 //Set the reference count of the object to 1 in mainstore for garbage collection
456                 ptr = modptr;
457                 for(i = 0; i< fixed->nummod; i++) {
458                         headptr = (objheader_t *) ptr;
459                         headptr->rcount = 1;
460                         ptr += sizeof(objheader_t) + classsize[headptr->type];
461                 }
462                 //Unlock objects that was locked in the trans
463                 for(i = 0; i< objlocked ; i++) {
464                         mobj = mhashSearch(oidlocked[i]);// find the header address
465                         ((objheader_t *)mobj)->status &= ~(LOCK);               
466                 }       
467         }       
468
469         //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
470         transinfo->objmod = oidmod;
471         transinfo->objlocked = oidlocked;
472         transinfo->objnotfound = oidnotfound;
473         transinfo->modptr = modptr;
474         transinfo->nummod = fixed->nummod;
475         transinfo->numlocked = objlocked;
476         transinfo->numnotfound = objnotfound;
477         
478         return control;
479 }
480
481 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
482 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
483         objheader_t *header;
484         int i = 0, offset = 0;
485         char control;
486         //Process each modified object saved in the mainobject store
487         for(i=0; i<transinfo->nummod; i++) {
488                 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
489                         printf("mhashserach returns NULL\n");
490                 }
491                 //change reference count of older address and free space in objstr ??
492                 header->rcount = 1; //Not sure what would be th val
493                 //change ptr address in mhash table
494                 printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
495                 mhashRemove(transinfo->objmod[i]);
496                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
497                 offset += sizeof(objheader_t) + classsize[header->type];
498                 //update object version
499                 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
500                 header->version += 1; 
501         }
502         for(i=0; i<transinfo->numlocked; i++) {
503                 //unlock objects
504                 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
505                 header->status &= ~(LOCK);
506         }
507
508         //TODO Update location lookup table
509
510         //send ack to coordinator
511         control = TRANS_SUCESSFUL;
512         //FOR TESTING
513         printf("DEBUG-> Transaction is SUCCESSFUL \n");
514         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
515                 perror("Error sending ACK to coordinator\n");
516         }
517         
518         return 0;
519 }
520