added new testcases
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //Initialize main object store
22         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
23         if (mhashCreate(HASH_SIZE, LOADFACTOR))
24                 return 1; //failure
25         
26         if (lhashCreate(HASH_SIZE, LOADFACTOR))
27                 return 1; //failure
28         
29         //pthread_t threadListen;
30         //pthread_create(&threadListen, NULL, dstmListen, NULL);
31         
32         return 0;
33 }
34
35 void *dstmListen()
36 {
37         int listenfd, acceptfd;
38         struct sockaddr_in my_addr;
39         struct sockaddr_in client_addr;
40         socklen_t addrlength = sizeof(struct sockaddr);
41         pthread_t thread_dstm_accept;
42         int i;
43
44         listenfd = socket(AF_INET, SOCK_STREAM, 0);
45         if (listenfd == -1)
46         {
47                 perror("socket");
48                 exit(1);
49         }
50
51         my_addr.sin_family = AF_INET;
52         my_addr.sin_port = htons(LISTEN_PORT);
53         my_addr.sin_addr.s_addr = INADDR_ANY;
54         memset(&(my_addr.sin_zero), '\0', 8);
55
56         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
57         {
58                 perror("bind");
59                 exit(1);
60         }
61         
62         if (listen(listenfd, BACKLOG) == -1)
63         {
64                 perror("listen");
65                 exit(1);
66         }
67
68         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
69         while(1)
70         {
71                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
72                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
73         }
74         pthread_exit(NULL);
75 }
76
77 void *dstmAccept(void *acceptfd)
78 {
79         int numbytes,i, val;
80         unsigned int oid;
81         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
82         char *ptr;
83         void *srcObj;
84         objheader_t *h;
85         trans_commit_data_t transinfo;
86         
87         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
88
89         printf("Recieved connection: fd = %d\n", (int)acceptfd);
90         recv((int)acceptfd, &control, sizeof(char), 0);
91         switch(control) {
92                 case READ_REQUEST:
93                         printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
94                         recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
95                         srcObj = mhashSearch(oid);
96                         h = (objheader_t *) srcObj;
97                         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
98                         if (h == NULL) {
99                                 ctrl = OBJECT_NOT_FOUND;
100                                 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
101                                         perror("Error sending control msg to coordinator\n");
102                                 }
103                         } else {
104                                 //char responsemessage[sizeof(char)+sizeof(int)];
105                                 /* Type */
106                                 ctrl = OBJECT_FOUND;
107                                 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
108                                         perror("Error sending control msg to coordinator\n");
109                                 }
110
111                                 //responsemessage[0]=OBJECT_FOUND;
112                                 /* Size of object */
113                                 //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
114                                 //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
115                                 //      perror("Error sending control msg to coordinator\n");
116                                 //}
117
118                                 /* Size of object */
119                                 if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
120                                         perror("Error sending size of object to coordinator\n");
121                                 }
122                                 if(send((int)acceptfd, h, size, 0) < 0) {
123                                         perror("Error in sending object\n");
124                                 }
125                         }
126                         break;
127                 
128                 case READ_MULT_REQUEST:
129                         printf("DEBUG-> READ_MULT_REQUEST\n");
130                         break;
131         
132                 case MOVE_REQUEST:
133                         printf("DEBUG -> MOVE_REQUEST\n");
134                         break;
135
136                 case MOVE_MULT_REQUEST:
137                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
138                         break;
139
140                 case TRANS_REQUEST:
141                         printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
142                         if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
143                                 printf("Error in readClientReq\n");
144                         }
145                         break;
146
147                 default:
148                         printf("Error receiving\n");
149         }
150         if (close((int)acceptfd) == -1)
151         {
152                 perror("close");
153         }
154         else
155                 printf("Closed connection: fd = %d\n", (int)acceptfd);
156         
157         //Free memory
158         free(transinfo.objmod);
159         free(transinfo.objlocked);
160         free(transinfo.objnotfound);
161         pthread_exit(NULL);
162 }
163
164 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
165         char *ptr, control, prevctrl, sendctrl, newctrl;
166         void *modptr, *header;
167         objheader_t *tmp_header;
168         fixed_data_t fixed;
169         int sum = 0, i, N, n, val;
170
171         //Reads to process the TRANS_REQUEST protocol further
172         // Read fixed_data
173         N = sizeof(fixed) - 1;
174         ptr = (char *)&fixed;;
175         fixed.control = TRANS_REQUEST;
176         do {
177                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
178         //      printf("DEBUG -> 1. Reading %d bytes \n", n);
179                 sum += n;
180         } while(sum < N && n != 0); 
181
182         //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
183         // Read list of mids
184         int mcount = fixed.mcount;
185         N = mcount * sizeof(unsigned int);
186         unsigned int listmid[mcount];
187         ptr = (char *) listmid;
188         sum = 0;
189         do {
190                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
191         //      printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
192                 sum += n;
193         } while(sum < N && n != 0);
194
195         // Read oid and version tuples
196         int numread = fixed.numread;
197         N = numread * (sizeof(unsigned int) + sizeof(short));
198         char objread[N];
199         sum = 0;
200         do {
201                 n = recv((int)acceptfd, (void *) objread, N, 0);
202         //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
203                 sum += n;
204         } while(sum < N && n != 0);
205         //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
206
207         // Read modified objects
208         if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
209                 printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
210                 return 1;
211         }
212         sum = 0;
213         do {
214                 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
215                 //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
216                 sum += n;
217         } while (sum < fixed.sum_bytes && n != 0);
218         
219         //Send control message as per all votes from all oids in the machine
220         if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
221                 printf("Handle req error\n");
222         }
223                 
224         //Read for new control message from Coordiator
225         recv((int)acceptfd, &control, sizeof(char), 0);
226         switch(control) {
227                 case TRANS_ABORT:
228                         printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
229                         //send ack to coordinator
230                         sendctrl = TRANS_SUCESSFUL;
231                         if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
232                                 perror("Error sending ACK to coordinator\n");
233                         }
234                         //Mark all ref counts as 1 and do garbage collection
235                         ptr = modptr;
236                         for(i = 0; i< fixed.nummod; i++) {
237                                 tmp_header = (objheader_t *)ptr;
238                                 tmp_header->rcount = 1;
239                                 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
240                         }
241                         //Unlock objects that was locked in this machine due to this transaction
242                         for(i = 0; i< transinfo->numlocked; i++) {
243                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
244                                 ((objheader_t *)header)->status &= ~(LOCK);             
245                         }
246                         ptr = NULL;
247                         return 0;
248
249                 case TRANS_COMMIT:
250                         printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
251                         if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
252                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
253                         }
254                         break;
255                 case TRANS_ABORT_BUT_RETRY_COMMIT:
256                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
257                         //Process again after waiting for sometime and on prev control message sent
258                         switch(prevctrl) {
259                                 case TRANS_AGREE:
260                                         sendctrl = TRANS_AGREE;
261                                         if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
262                                                 perror("Error sending ACK to coordinator\n");
263                                         }
264                                         sleep(5);
265                                         break;
266                                 case TRANS_SOFT_ABORT:
267                                         if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
268                                                 printf("Handle req error\n");
269                                         }
270                                         if(newctrl == prevctrl){
271                                                 //Send ABORT
272                                                 newctrl = TRANS_DISAGREE;
273                                                 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
274                                                         perror("Error sending ACK to coordinator\n");
275                                                 }
276                                                 //Set the reference count of the object to 1 in mainstore for garbage collection
277                                                 ptr = modptr;
278                                                 for(i = 0; i< fixed.nummod; i++) {
279                                                         tmp_header = (objheader_t *) ptr;
280                                                         tmp_header->rcount = 1;
281                                                         ptr += sizeof(objheader_t) + classsize[tmp_header->type];
282                                                 }
283                                                 //Unlock objects that was locked in this machine due to this transaction
284                                                 for(i = 0; i< transinfo->numlocked; i++) {
285                                                         ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
286                                                         ((objheader_t *)ptr)->status &= ~(LOCK);                
287                                                 }
288                                                 return 0;
289                                         } else {
290                                                 //Send new control message
291                                                 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
292                                                         perror("Error sending ACK to coordinator\n");
293                                                 }
294                                         }
295                                                 
296                                         break;
297                         }
298                         
299                         break;
300                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
301                         //TODO expect another transrequest from client
302                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
303                         break;
304                 default:
305                         printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
306                         //TODO Use fixed.trans_id  TID since Client may have died
307                         break;
308         }
309
310         return 0;
311 }
312
313 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
314 //and returns the appropriate control message to the Ccordinator 
315 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
316         short version;
317         char control = 0, ctrlmissoid, *ptr;
318         int i, j = 0;
319         unsigned int oid;
320         unsigned int *oidnotfound, *oidlocked, *oidmod;
321
322         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
323         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
324         oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
325
326         // Counters and arrays to formulate decision on control message to be sent
327         int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
328         int objmodnotfound = 0, nummodfound = 0;
329         void *mobj;
330         objheader_t *headptr;
331         
332         //Process each object present in the pile 
333         ptr = modptr;
334         printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
335         fflush(stdout);
336         //Process each oid in the machine pile/ group
337         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
338                 if (i < fixed->numread) {//Object is read
339                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
340                         incr *= i;
341                         oid = *((unsigned int *)(objread + incr));
342                         incr += sizeof(unsigned int);
343                         version = *((short *)(objread + incr));
344                 } else {//Obj is modified
345                         headptr = (objheader_t *) ptr;
346                         oid = headptr->oid;
347                         oidmod[objmod] = oid;//Array containing modified oids
348                         objmod++;
349                         version = headptr->version;
350                         ptr += sizeof(objheader_t) + classsize[headptr->type];
351                 }
352                 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
353                 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
354                         //Save the oids not found for later use
355                         oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
356                         objnotfound++;
357                 } else { // If obj found in machine (i.e. has not moved)
358                         //Check if obj is locked
359                         if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
360                                 if (version == ((objheader_t *)mobj)->version) {      // If version match
361                                         v_matchlock++;
362                                 } else {//If versions don't match ..HARD ABORT
363                                         v_nomatch++;
364                                         //send TRANS_DISAGREE to Coordinator
365                                         control = TRANS_DISAGREE;
366                                         write(acceptfd, &control, sizeof(char));
367                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
368                                         return control;
369                                 }
370                         } else {//Obj is not locked , so lock object
371                                 ((objheader_t *)mobj)->status |= LOCK;
372                                 //Save all object oids that are locked on this machine during this transaction request call
373                                 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
374                                 printf("DEBUG-> Object to be locked is %d\n", ((objheader_t *)mobj)->oid);
375                                 objlocked++;
376                                 if (version == ((objheader_t *)mobj)->version) { //If versions match
377                                         v_matchnolock++;
378                                 } else { //If versions don't match
379                                         v_nomatch++;
380                                         //send TRANS_DISAGREE to Coordinator
381                                         control = TRANS_DISAGREE;
382                                         write(acceptfd, &control, sizeof(char));
383                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
384                                         return control;
385                                 }
386                         }
387                 }
388         }
389
390         printf("No of objs locked = %d\n", objlocked);
391         printf("No of v_nomatch = %d\n", v_nomatch);
392         printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
393         printf("No of objs v_match but had locks before = %d\n", v_matchlock);
394         printf("No of objs not found = %d\n", objnotfound);
395         printf("No of objs modified but not found = %d\n", objmodnotfound);
396
397         //Decide what control message(s) to send
398         if(v_matchnolock == fixed->numread + fixed->nummod) {
399                 //send TRANS_AGREE to Coordinator
400                 control = TRANS_AGREE;
401                 write(acceptfd, &control, sizeof(char));
402                 printf("DEBUG -> Sending TRANS_AGREE\n");
403         }
404
405         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
406                 //send TRANS_SOFT_ABORT to Coordinator
407                 control = TRANS_SOFT_ABORT;
408                 write(acceptfd, &control, sizeof(char));
409                 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
410                 //send number of oids not found and the missing oids 
411                 write(acceptfd, &objnotfound, sizeof(int));
412                 if(objnotfound != 0) 
413                         write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
414         }
415         
416         //Do the following when TRANS_DISAGREE is sent
417         if(control == TRANS_DISAGREE) {
418                 //Set the reference count of the object to 1 in mainstore for garbage collection
419                 ptr = modptr;
420                 for(i = 0; i< fixed->nummod; i++) {
421                         headptr = (objheader_t *) ptr;
422                         headptr->rcount = 1;
423                         ptr += sizeof(objheader_t) + classsize[headptr->type];
424                 }
425                 //Unlock objects that was locked in the trans
426                 for(i = 0; i< objlocked ; i++) {
427                         mobj = mhashSearch(oidlocked[i]);// find the header address
428                         ((objheader_t *)mobj)->status &= ~(LOCK);               
429                 }       
430         }       
431
432         //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
433         transinfo->objmod = oidmod;
434         transinfo->objlocked = oidlocked;
435         transinfo->objnotfound = oidnotfound;
436         transinfo->modptr = modptr;
437         transinfo->nummod = fixed->nummod;
438         transinfo->numlocked = objlocked;
439         transinfo->numnotfound = objnotfound;
440         
441         return control;
442 }
443
444 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
445 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
446         objheader_t *header;
447         int i = 0, offset = 0;
448         char control;
449         //Process each modified object saved in the mainobject store
450         for(i=0; i<transinfo->nummod; i++) {
451                 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
452                         printf("mhashserach returns NULL\n");
453                 }
454                 //change reference count of older address and free space in objstr ??
455                 header->rcount = 1; //Not sure what would be th val
456                 //change ptr address in mhash table
457                 mhashRemove(transinfo->objmod[i]);
458                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
459                 offset += sizeof(objheader_t) + classsize[header->type];
460                 //update object version
461                 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
462                 header->version += 1; 
463         }
464         for(i=0; i<transinfo->numlocked; i++) {
465                 //unlock objects
466                 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
467                 header->status &= ~(LOCK);
468         }
469
470         //TODO Update location lookup table
471
472         //send ack to coordinator
473         control = TRANS_SUCESSFUL;
474         if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
475                 perror("Error sending ACK to coordinator\n");
476         }
477
478         return 0;
479 }
480