Finished and tested complete TRANS_COMMIT process
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //Initialize main object store
22         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
23         if (mhashCreate(HASH_SIZE, LOADFACTOR))
24                 return 1; //failure
25         
26         if (lhashCreate(HASH_SIZE, LOADFACTOR))
27                 return 1; //failure
28         
29         //pthread_t threadListen;
30         //pthread_create(&threadListen, NULL, dstmListen, NULL);
31         
32         return 0;
33 }
34
35 void *dstmListen()
36 {
37         int listenfd, acceptfd;
38         struct sockaddr_in my_addr;
39         struct sockaddr_in client_addr;
40         socklen_t addrlength = sizeof(struct sockaddr);
41         pthread_t thread_dstm_accept;
42         int i;
43
44         listenfd = socket(AF_INET, SOCK_STREAM, 0);
45         if (listenfd == -1)
46         {
47                 perror("socket");
48                 exit(1);
49         }
50
51         my_addr.sin_family = AF_INET;
52         my_addr.sin_port = htons(LISTEN_PORT);
53         my_addr.sin_addr.s_addr = INADDR_ANY;
54         memset(&(my_addr.sin_zero), '\0', 8);
55
56         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
57         {
58                 perror("bind");
59                 exit(1);
60         }
61         
62         if (listen(listenfd, BACKLOG) == -1)
63         {
64                 perror("listen");
65                 exit(1);
66         }
67
68         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
69         while(1)
70         {
71                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
72                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
73         }
74         pthread_exit(NULL);
75 }
76
77 void *dstmAccept(void *acceptfd)
78 {
79         int numbytes,i, val;
80         unsigned int oid;
81         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
82         char *ptr;
83         void *srcObj;
84         objheader_t *h;
85         trans_commit_data_t transinfo;
86         
87         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
88
89         printf("Recieved connection: fd = %d\n", (int)acceptfd);
90         recv((int)acceptfd, &control, sizeof(char), 0);
91         switch(control) {
92                 case READ_REQUEST:
93                         printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
94                         recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
95                         srcObj = mhashSearch(oid);
96                         h = (objheader_t *) srcObj;
97                         if (h == NULL) {
98                                 ctrl = OBJECT_NOT_FOUND;
99                         } else {
100                                 ctrl = OBJECT_FOUND;
101                                 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
102                                 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
103                                         perror("Error sending control msg to coordinator\n");
104                                 }
105                                 if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
106                                         perror("Error sending size of object to coordinator\n");
107                                 }
108                                 if(send((int)acceptfd, h, size, 0) < 0) {
109                                         perror("Error in sending object\n");
110                                 }
111                         }
112                         break;
113                 
114                 case READ_MULT_REQUEST:
115                         printf("DEBUG-> READ_MULT_REQUEST\n");
116                         break;
117         
118                 case MOVE_REQUEST:
119                         printf("DEBUG -> MOVE_REQUEST\n");
120                         break;
121
122                 case MOVE_MULT_REQUEST:
123                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
124                         break;
125
126                 case TRANS_REQUEST:
127                         printf("DEBUG -> Recv TRANS_REQUEST from Coordinator\n");
128                         if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
129                                 printf("Error in readClientReq\n");
130                         }
131                         break;
132
133                 default:
134                         printf("Error receiving\n");
135         }
136         if (close((int)acceptfd) == -1)
137         {
138                 perror("close");
139         }
140         else
141                 printf("Closed connection: fd = %d\n", (int)acceptfd);
142         
143         pthread_exit(NULL);
144 }
145
146 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
147         char *ptr, control, prevctrl, sendctrl;
148         void *modptr;
149         objheader_t *h, tmp_header;
150         fixed_data_t fixed;
151         int sum = 0, N, n, val;
152
153         //Reads to process the TRANS_REQUEST protocol further
154         // Read fixed_data
155         N = sizeof(fixed) - 1;
156         ptr = (char *)&fixed;;
157         fixed.control = TRANS_REQUEST;
158         do {
159                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
160         //      printf("DEBUG -> 1. Reading %d bytes \n", n);
161                 sum += n;
162         } while(sum < N && n != 0); 
163
164         //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
165         // Read list of mids
166         int mcount = fixed.mcount;
167         N = mcount * sizeof(unsigned int);
168         unsigned int listmid[mcount];
169         ptr = (char *) listmid;
170         sum = 0;
171         do {
172                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
173         //      printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
174                 sum += n;
175         } while(sum < N && n != 0);
176
177         // Read oid and version tuples
178         int numread = fixed.numread;
179         N = numread * (sizeof(unsigned int) + sizeof(short));
180         char objread[N];
181         sum = 0;
182         do {
183                 n = recv((int)acceptfd, (void *) objread, N, 0);
184         //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
185                 sum += n;
186         } while(sum < N && n != 0);
187         //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
188
189         // Read modified objects
190         if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
191                 printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
192                 return 1;
193         }
194         sum = 0;
195         do {
196                 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
197                 //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
198                 sum += n;
199         } while (sum < fixed.sum_bytes && n != 0);
200         
201         //Send control message as per all votes from the particpants
202         if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
203                 printf("Handle req error\n");
204         }
205                 
206         //Read for new control message from Coordiator
207         recv((int)acceptfd, &control, sizeof(char), 0);
208         switch(control) {
209                 case TRANS_ABORT:
210                         printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
211                         //send ack to coordinator
212                         sendctrl = TRANS_SUCESSFUL;
213                         if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
214                                 perror("Error sending ACK to coordinator\n");
215                         }
216                         break;
217
218                 case TRANS_COMMIT:
219                         printf("DEBUG -> Recv TRANS_COMMIT from Coordinator\n");
220                         if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
221                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
222                         }
223                         break;
224                 case TRANS_ABORT_BUT_RETRY_COMMIT:
225                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator\n");
226                         //Process again after waiting for sometime and on prev control message sent
227                         switch(prevctrl) {
228                                 case TRANS_AGREE:
229                                         sleep(2);
230                                         break;
231                                 case TRANS_AGREE_BUT_MISSING_OBJECTS:
232                                         break;
233                                 case TRANS_SOFT_ABORT:
234                                         break;
235                         }
236                         //Try sending either agree or disagree after sometime
237                         //TODO
238                         //Wait in a blocking thread or something
239                         //Recv from client new listmid, mcount and pilecount
240                         //call 2 new functions that are similar to readClientReq and handleRequest
241                         break;
242                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
243                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator\n");
244                         break;
245                 default:
246                         printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
247                         break;
248         }
249
250         return 0;
251 }
252
253 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
254 //and returns the appropriate control message to the Ccordinator 
255 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
256         short version;
257         char control = 0, ctrlmissoid, *ptr, *oidmodnotfound;
258         int i, j = 0;
259         unsigned int oid;
260         unsigned int *oidnotfound, *oidlocked, *oidmod;
261
262         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
263         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
264         oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
265         oidmodnotfound = (char *) calloc(fixed->nummod, sizeof(char));
266
267         // Counters and arrays to formulate decision on control message to be sent
268         int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
269         int objmodnotfound = 0, nummodfound = 0;
270         void *mobj;
271         objheader_t *headptr;
272         //TODO remove this deadcode from here
273         objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
274         
275         //Process each object present in the pile 
276         ptr = modptr;
277         //Process each oid in the machine pile/ group
278         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
279                 if (i < fixed->numread) {//Object is read
280                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
281                         incr *= i;
282                         oid = *((unsigned int *)(objread + incr));
283                         incr += sizeof(unsigned int);
284                         version = *((short *)(objread + incr));
285                 } else {//Obj is modified
286                         headptr = (objheader_t *) ptr;
287                         oid = headptr->oid;
288                         oidmod[objmod] = oid;//Array containing modified oids
289                         objmod++;
290                         version = headptr->version;
291                         ptr += sizeof(objheader_t) + classsize[headptr->type];
292                 }
293                 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
294                 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
295                         objinfo[i].poss_val = OBJECT_NOT_FOUND;
296                         if(i >= fixed->numread && (i < (fixed->nummod + fixed->numread))) {
297                                 oidmodnotfound[i - fixed->numread] = 1; //array keeps track of oids that are a subset of oidmod and found on machine 
298                                 objmodnotfound++;
299                         }
300                         //Save the oids not found for later use
301                         oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
302                         objnotfound++;
303                 } else { // If obj found in machine (i.e. has not moved)
304                         //Check if obj is locked
305                         if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
306                                 if (version == ((objheader_t *)mobj)->version) {      // If version match
307                                         objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
308                                         v_matchlock++;
309                                 } else {//If versions don't match ..HARD ABORT
310                                         objinfo[i].poss_val = VERSION_NO_MATCH;
311                                         v_nomatch++;
312                                         //send TRANS_DISAGREE to Coordinator
313                                         control = TRANS_DISAGREE;
314                                         write(acceptfd, &control, sizeof(char));
315                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
316                                         return control;
317                                 }
318                         } else {//Obj is not locked , so lock object
319                                 ((objheader_t *)mobj)->status |= LOCK;
320                                 //Save all object oids that are locked on this machine during this transaction request call
321                                 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
322                                 objlocked++;
323                                 if (version == ((objheader_t *)mobj)->version) { //If versions match
324                                         objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
325                                         v_matchnolock++;
326                                 } else { //If versions don't match
327                                         objinfo[i].poss_val = VERSION_NO_MATCH;
328                                         v_nomatch++;
329                                         //send TRANS_DISAGREE to Coordinator
330                                         control = TRANS_DISAGREE;
331                                         write(acceptfd, &control, sizeof(char));
332                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
333                                         return control;
334                                 }
335                         }
336                 }
337         }
338
339         printf("No of objs locked = %d\n", objlocked);
340         printf("No of v_nomatch = %d\n", v_nomatch);
341         printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
342         printf("No of objs v_match but had locks before = %d\n", v_matchlock);
343         printf("No of objs not found = %d\n", objnotfound);
344         printf("No of objs modified but not found = %d\n", objmodnotfound);
345
346         //Decide what control message(s) to send
347         if(v_matchnolock == fixed->numread + fixed->nummod) {
348                 //send TRANS_AGREE to Coordinator
349                 control = TRANS_AGREE;
350                 write(acceptfd, &control, sizeof(char));
351                 printf("DEBUG -> Sending TRANS_AGREE\n");
352         }
353         
354         if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
355                 //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
356                 control = TRANS_AGREE_BUT_MISSING_OBJECTS;
357                 write(acceptfd, &control, sizeof(char));
358                 printf("DEBUG -> Sending TRANS_AGREE_BUT_MISSING_OBJECTS\n");
359                 //send number of oids not found and the missing oids 
360                 write(acceptfd, &objnotfound, sizeof(int));
361                 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
362         }
363         
364         if(v_matchlock > 0 && v_nomatch == 0) {
365                 //send TRANS_SOFT_ABORT to Coordinator
366                 control = TRANS_SOFT_ABORT;
367                 write(acceptfd, &control, sizeof(char));
368                 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
369                 //send number of oids not found and the missing oids 
370                 write(acceptfd, &objnotfound, sizeof(int));
371                 if(objnotfound != 0) 
372                         write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
373         }
374         
375         //Do the following when TRANS_DISAGREE is sent
376         if(control == TRANS_DISAGREE) {
377                 //Set the reference count of the object to 1 in mainstore for garbage collection
378                 ptr = modptr;
379                 for(i = 0; i< fixed->nummod; i++) {
380                         headptr = (objheader_t *) ptr;
381                         headptr->rcount = 1;
382                         ptr += sizeof(objheader_t) + classsize[headptr->type];
383                 }
384                 //Unlock objects that was locked in the trans
385                 for(i = 0; i< objlocked ; i++) {
386                         mobj = mhashSearch(oidlocked[i]);// find the header address
387                         ((objheader_t *)mobj)->status &= ~(LOCK);               
388                 }       
389         }       
390
391         // List of objects that were sent as modified in the TRANS_REQUEST but are now not found on the machine         
392         nummodfound = fixed->nummod - objmodnotfound;
393         unsigned int oidmodfound[nummodfound];
394         for(i = 0; i< fixed->nummod; i++) {
395                 if(oidmodnotfound[i] == 0) {
396                         oidmodfound[j] = oidmod[i];
397                         j++;
398                 } 
399         }
400         //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
401         transinfo->objmod = oidmod;
402         transinfo->objlocked = oidlocked;
403         transinfo->modptr = modptr;
404         transinfo->nummod = fixed->nummod;
405         transinfo->numlocked = objlocked;
406         
407         return control;
408 }
409
410 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
411 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
412         objheader_t *header;
413         int i = 0, offset = 0;
414         char control;
415         //Process each modified object saved in the mainobject store
416         for(i=0; i<transinfo->nummod; i++) {
417                 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
418                         printf("mhashserach returns NULL\n");
419                 }
420                 //change reference count of older address and free space in objstr ??
421                 header->rcount = 1; //Not sure what would be th val
422                 //change ptr address in mhash table
423                 mhashRemove(transinfo->objmod[i]);
424                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
425                 offset += sizeof(objheader_t) + classsize[header->type];
426                 //update object version
427                 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
428                 header->version += 1; 
429         }
430         for(i=0; i<transinfo->numlocked; i++) {
431                 //unlock objects
432                 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
433                 header->status &= ~(LOCK);
434         }
435
436         //TODO Update location lookup table
437
438         //send ack to coordinator
439         control = TRANS_SUCESSFUL;
440         if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
441                 perror("Error sending ACK to coordinator\n");
442         }
443
444         return 0;
445 }