Fixed bugs
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //Initialize main object store
22         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
23         if (mhashCreate(HASH_SIZE, LOADFACTOR))
24                 return 1; //failure
25         
26         if (lhashCreate(HASH_SIZE, LOADFACTOR))
27                 return 1; //failure
28         
29         return 0;
30 }
31
32 void *dstmListen()
33 {
34         int listenfd, acceptfd;
35         struct sockaddr_in my_addr;
36         struct sockaddr_in client_addr;
37         socklen_t addrlength = sizeof(struct sockaddr);
38         pthread_t thread_dstm_accept;
39         int i;
40         int setsockflag=1;
41
42         listenfd = socket(AF_INET, SOCK_STREAM, 0);
43         if (listenfd == -1)
44         {
45                 perror("socket");
46                 exit(1);
47         }
48
49         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
50           perror("socket");
51           exit(1);
52         }
53 #ifdef MAC
54         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
55           perror("socket");
56           exit(1);
57         }
58 #endif
59
60         my_addr.sin_family = AF_INET;
61         my_addr.sin_port = htons(LISTEN_PORT);
62         my_addr.sin_addr.s_addr = INADDR_ANY;
63         memset(&(my_addr.sin_zero), '\0', 8);
64
65         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
66         {
67                 perror("bind");
68                 exit(1);
69         }
70         
71         if (listen(listenfd, BACKLOG) == -1)
72         {
73                 perror("listen");
74                 exit(1);
75         }
76
77         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
78         while(1)
79         {
80                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
81                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
82         }
83         pthread_exit(NULL);
84 }
85
86 void *dstmAccept(void *acceptfd)
87 {
88         int numbytes,i, val, retval;
89         unsigned int oid;
90         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
91         char *ptr;
92         void *srcObj;
93         objheader_t *h;
94         trans_commit_data_t transinfo;
95         
96         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
97
98         printf("Recieved connection: fd = %d\n", (int)acceptfd);
99         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
100                 if (retval == 0) {
101                         return; // Testing connection
102                 }
103                 perror("Error in receiving control from coordinator\n");
104                 return;
105         }
106         switch(control) {
107                 case READ_REQUEST:
108                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
109                                 perror("Error receiving object from cooridnator\n");
110                                 return;
111                         }
112                         srcObj = mhashSearch(oid);
113                         h = (objheader_t *) srcObj;
114                         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
115                         if (h == NULL) {
116                                 ctrl = OBJECT_NOT_FOUND;
117                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
118                                         perror("Error sending control msg to coordinator\n");
119                                 }
120                         } else {
121                                 /* Type */
122                           char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
123                           *((int *)&msg[1])=size;
124                           if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
125                             perror("Error sending size of object to coordinator\n");
126                           }
127                           if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
128                             perror("Error in sending object\n");
129                           }
130                         }
131                         break;
132                 
133                 case READ_MULT_REQUEST:
134                         printf("DEBUG-> READ_MULT_REQUEST\n");
135                         break;
136         
137                 case MOVE_REQUEST:
138                         printf("DEBUG -> MOVE_REQUEST\n");
139                         break;
140
141                 case MOVE_MULT_REQUEST:
142                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
143                         break;
144
145                 case TRANS_REQUEST:
146                         printf("DEBUG -> Recv TRANS_REQUEST\n");
147                         if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
148                                 printf("Error in readClientReq\n");
149                         }
150                         break;
151
152                 default:
153                         printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
154         }
155         if (close((int)acceptfd) == -1)
156                 perror("close");
157         else 
158                 printf("Closed connection: fd = %d\n", (int)acceptfd);
159         
160         pthread_exit(NULL);
161 }
162
163 // Reads transaction request per thread
164 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
165         char *ptr, control, prevctrl, sendctrl, newctrl;
166         void *modptr, *header;
167         objheader_t *tmp_header;
168         fixed_data_t fixed;
169         int sum = 0, i, N, n, val, retval;
170
171         //Reads to process the TRANS_REQUEST protocol further
172         // Read fixed_data
173         N = sizeof(fixed) - 1;
174         ptr = (char *)&fixed;;
175         fixed.control = TRANS_REQUEST;
176         do {
177                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
178                 sum += n;
179         } while(sum < N && n != 0); 
180
181         // Read list of mids
182         int mcount = fixed.mcount;
183         N = mcount * sizeof(unsigned int);
184         unsigned int listmid[mcount];
185         ptr = (char *) listmid;
186         sum = 0;
187         do {
188                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
189                 sum += n;
190         } while(sum < N && n != 0);
191
192         // Read oid and version tuples
193         int numread = fixed.numread;
194         N = numread * (sizeof(unsigned int) + sizeof(short));
195         char objread[N];
196         if(numread != 0) { //If pile contains more than one object to be read, 
197                           // keep reading all objects
198                 sum = 0;
199                 do {
200                         n = recv((int)acceptfd, (void *) objread, N, 0);
201                         sum += n;
202                 } while(sum < N && n != 0);
203         }
204         
205         // Read modified objects
206         if(fixed.nummod != 0) { // If pile contains more than one modified object,
207                                 // allocate new object store and recv all modified objects
208                 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
209                         printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
210                         return 1;
211                 }
212                 sum = 0;
213                 do { // Recv the objs that are modified at Coordinator
214                         n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
215                         sum += n;
216                 } while (sum < fixed.sum_bytes && n != 0);
217         }
218
219         // Process the information available in the TRANS_REQUEST control
220         //Send control message as per all votes from all oids in the machine
221         if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
222                 printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
223                 return 1;
224         }
225         //Read for new control message from Coordiator
226         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
227                 perror("Error in receiving control message\n");
228                 return 1;
229         }
230
231         switch(control) {
232                 case TRANS_ABORT:
233                         //Mark all ref counts as 1 and do garbage collection
234                         ptr = modptr;
235                         for(i = 0; i< fixed.nummod; i++) {
236                                 tmp_header = (objheader_t *)ptr;
237                                 tmp_header->rcount = 1;
238                                 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
239                         }
240                         //Unlock objects that was locked in this machine due to this transaction
241                         for(i = 0; i< transinfo->numlocked; i++) {
242                                 printf("DEBUG-> Unlocking objects\n");
243                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
244                                 ((objheader_t *)header)->status &= ~(LOCK);             
245                         }
246                 
247                         //send ack to coordinator
248                         printf("DEBUG -> Recv TRANS_ABORT\n");
249                         sendctrl = TRANS_SUCESSFUL;
250                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
251                                 perror("Error sending ACK to coordinator\n");
252                                 return 1;
253                         }
254                 
255                         ptr = NULL;
256                         return 0;
257                 case TRANS_COMMIT:
258                         printf("DEBUG -> Recv TRANS_COMMIT \n");
259                         if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
260                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
261                         }
262                         break;
263
264                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
265                         //TODO expect another transrequest from client
266                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
267                         break;
268                 default:
269                         printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
270                         //TODO Use fixed.trans_id  TID since Client may have died
271                         break;
272         }
273         //Free memory
274         printf("DEBUG -> Freeing...\n");
275         fflush(stdout);
276         if (transinfo->objmod != NULL) {
277                 free(transinfo->objmod);
278                 transinfo->objmod = NULL;
279         }
280         if (transinfo->objlocked != NULL) {
281                 free(transinfo->objlocked);
282                 transinfo->objlocked = NULL;
283         }
284         if (transinfo->objnotfound != NULL) {
285                 free(transinfo->objnotfound);
286                 transinfo->objnotfound = NULL;
287         }
288         return 0;
289 }
290
291 //This function runs a decision after all objects involved in TRANS_REQUEST 
292 //and returns the appropriate control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT to the Ccordinator 
293 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
294         int val;
295         short version;
296         char control = 0, ctrlmissoid, *ptr;
297         int i, j = 0;
298         unsigned int oid;
299         unsigned int *oidnotfound, *oidlocked, *oidmod;
300
301         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
302         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
303         oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
304         // Counters and arrays to formulate decision on control message to be sent
305         // version match or  no match
306         int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
307         int objmodnotfound = 0, nummodfound = 0;
308         void *mobj;
309         objheader_t *headptr;
310         
311         //Process each object present in the pile 
312         ptr = modptr;
313         
314         //Process each oid in the machine pile/ group per thread
315         //Should be a new function
316         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
317                 if (i < fixed->numread) {//Object is read
318                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
319                         incr *= i;
320                         oid = *((unsigned int *)(objread + incr));
321                         incr += sizeof(unsigned int);
322                         version = *((short *)(objread + incr));
323                 } else {//Obj is modified
324                         headptr = (objheader_t *) ptr;
325                         oid = headptr->oid;
326                         oidmod[objmod] = oid;//Array containing modified oids
327                         objmod++;
328                         version = headptr->version;
329                         ptr += sizeof(objheader_t) + classsize[headptr->type];
330                 }
331                 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
332                 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
333                         //Save the oids not found for later use
334                         oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
335                         objnotfound++;
336                 } else { // If obj found in machine (i.e. has not moved)
337                         //Check if obj is locked
338                         if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
339                                 if (version == ((objheader_t *)mobj)->version) {      // If version match
340                                         printf("DEBUG -> obj = %d locked\n", ((objheader_t *)mobj)->oid);
341                                         v_matchlock++;
342                                 } else {//If versions don't match ..HARD ABORT
343                                         v_nomatch++;
344                                         //send TRANS_DISAGREE to Coordinator
345                                         control = TRANS_DISAGREE;
346                                         if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
347                                                 perror("Error in sending control to the Coordinator\n");
348                                                 return 0;
349                                         }
350                                         printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
351                                         return control;
352                                 }
353                         } else {//Obj is not locked , so lock object
354                                 ((objheader_t *)mobj)->status |= LOCK;
355                                 // TESTING Add sleep to make transactions run for a long time such that 
356                                 // we can test for soft abort case
357                                 sleep(1);
358                                 //Save all object oids that are locked on this machine during this transaction request call
359                                 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
360                                 objlocked++;
361                                 if (version == ((objheader_t *)mobj)->version) { //If versions match
362                                         v_matchnolock++;
363                                 } else { //If versions don't match
364                                         v_nomatch++;
365                                         //send TRANS_DISAGREE to Coordinator
366                                         control = TRANS_DISAGREE;
367                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
368                                                 perror("Error in sending control to the Coordinator\n");
369                                                 return 0;
370                                         }
371                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
372                                         return control;
373                                 }
374                         }
375                 }
376         }
377         
378         //Decide what control message(s) to send
379         // Should be a new function
380         if(v_matchnolock == fixed->numread + fixed->nummod) {
381                 //send TRANS_AGREE to Coordinator
382                 control = TRANS_AGREE;
383                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
384                         perror("Error in sending control to Coordinator\n");
385                         return 0;
386                 }
387                 printf("DEBUG -> Sending TRANS_AGREE\n");
388         }
389         //Condition to send TRANS_SOFT_ABORT
390         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
391                 control = TRANS_SOFT_ABORT;
392                 char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
393                 *((int*)&msg[1])= objnotfound;
394
395                 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
396                 //Sending control message
397                 if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
398                         perror("Error in sending no of objects that are not found\n");
399                         return 0;
400                 }
401                 //send number of oids not found and the missing oids if objects are missing in the machine
402                 if(objnotfound != 0) { 
403                   int size = sizeof(unsigned int)*objnotfound;
404                   if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
405                                 perror("Error in sending objects that are not found\n");
406                                 return 0;
407                         }
408                 }
409         }
410         
411         //Do the following when TRANS_DISAGREE is sent
412         if(control == TRANS_DISAGREE) {
413                 //Set the reference count of the object to 1 in mainstore for garbage collection
414                 ptr = modptr;
415                 for(i = 0; i< fixed->nummod; i++) {
416                         headptr = (objheader_t *) ptr;
417                         headptr->rcount = 1;
418                         ptr += sizeof(objheader_t) + classsize[headptr->type];
419                 }
420                 //Unlock objects that was locked in the trans
421                 for(i = 0; i< objlocked ; i++) {
422                         mobj = mhashSearch(oidlocked[i]);// find the header address
423                         ((objheader_t *)mobj)->status &= ~(LOCK);               
424                 }       
425         }       
426
427         //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
428         transinfo->objmod = oidmod;
429         transinfo->objlocked = oidlocked;
430         transinfo->objnotfound = oidnotfound;
431         transinfo->modptr = modptr;
432         transinfo->nummod = fixed->nummod;
433         transinfo->numlocked = objlocked;
434         transinfo->numnotfound = objnotfound;
435         
436         return control;
437 }
438
439 //Process oids in the TRANS_COMMIT requested by the participant and sends an ACK back to Coordinator
440 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
441         objheader_t *header;
442         int i = 0, offset = 0;
443         char control;
444         //Process each modified object saved in the mainobject store
445         for(i=0; i<transinfo->nummod; i++) {
446                 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
447                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
448                 }
449                 //change reference count of older address and free space in objstr ??
450                 header->rcount = 1; //Not sure what would be th val
451                 //change ptr address in mhash table
452                 printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
453                 mhashRemove(transinfo->objmod[i]);
454                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
455                 offset += sizeof(objheader_t) + classsize[header->type];
456                 //update object version
457                 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
458                 header->version += 1; 
459         }
460         for(i=0; i<transinfo->numlocked; i++) {
461                 //unlock objects
462                 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
463                 header->status &= ~(LOCK);
464         }
465
466         //TODO Update location lookup table
467
468         //send ack to coordinator
469         control = TRANS_SUCESSFUL;
470         printf("DEBUG-> TRANS_SUCESSFUL\n");
471         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
472                 perror("Error sending ACK to coordinator\n");
473         }
474         
475         return 0;
476 }
477