server portions done except a few things
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //todo:initialize main object store
22         //do we want this to be a global variable, or provide
23         //separate access funtions and hide the structure?
24         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
25         if (mhashCreate(HASH_SIZE, LOADFACTOR))
26                 return 1; //failure
27         
28         if (lhashCreate(HASH_SIZE, LOADFACTOR))
29                 return 1; //failure
30         
31         //pthread_t threadListen;
32         //pthread_create(&threadListen, NULL, dstmListen, NULL);
33         
34         return 0;
35 }
36
37 void *dstmListen()
38 {
39         int listenfd, acceptfd;
40         struct sockaddr_in my_addr;
41         struct sockaddr_in client_addr;
42         socklen_t addrlength = sizeof(struct sockaddr);
43         pthread_t thread_dstm_accept;
44         int i;
45
46         listenfd = socket(AF_INET, SOCK_STREAM, 0);
47         if (listenfd == -1)
48         {
49                 perror("socket");
50                 exit(1);
51         }
52
53         my_addr.sin_family = AF_INET;
54         my_addr.sin_port = htons(LISTEN_PORT);
55         my_addr.sin_addr.s_addr = INADDR_ANY;
56         memset(&(my_addr.sin_zero), '\0', 8);
57
58         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
59         {
60                 perror("bind");
61                 exit(1);
62         }
63         
64         if (listen(listenfd, BACKLOG) == -1)
65         {
66                 perror("listen");
67                 exit(1);
68         }
69
70         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
71         while(1)
72         {
73                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
74                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
75         }
76         pthread_exit(NULL);
77 }
78
79 void *dstmAccept(void *acceptfd)
80 {
81         int numbytes,i, val;
82         unsigned int oid;
83         char buffer[RECEIVE_BUFFER_SIZE], control;
84         char *ptr;
85         void *srcObj;
86         objheader_t *h;
87         
88         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
89
90         printf("Recieved connection: fd = %d\n", (int)acceptfd);
91         recv((int)acceptfd, &control, sizeof(char), 0);
92         switch(control) {
93                 case READ_REQUEST:
94                         recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
95                         srcObj = mhashSearch(oid);
96                         h = (objheader_t *) srcObj;
97                         if (h == NULL) {
98                                 buffer[0] = OBJECT_NOT_FOUND;
99                         } else {
100                                 buffer[0] = OBJECT_FOUND;
101                                 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
102                                 memcpy(buffer+1, srcObj, size);
103                         }
104                         if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
105                                 perror("");
106                         }
107                         break;
108                 
109                 case READ_MULT_REQUEST:
110                         printf("DEBUG-> READ_MULT_REQUEST\n");
111                         break;
112         
113                 case MOVE_REQUEST:
114                         printf("DEBUG -> MOVE_REQUEST\n");
115                         break;
116
117                 case MOVE_MULT_REQUEST:
118                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
119                         break;
120
121                 case TRANS_REQUEST:
122                         //printf("DEBUG -> TRANS_REQUEST\n");
123                         if((val = readClientReq((int)acceptfd)) == 1) {
124                                 printf("Error in readClientReq\n");
125                         }
126                         break;
127
128                 default:
129                         printf("Error receiving\n");
130         }
131         
132         //Read for new control message from Coordiator
133         recv((int)acceptfd, &control, sizeof(char), 0);
134         switch(control) {
135                 case TRANS_ABORT:
136                         printf("DEBUG -> TRANS_ABORT\n");
137                         write((int)acceptfd, &control, sizeof(char));
138                         break;
139
140                 case TRANS_COMMIT:
141                         printf("DEBUG -> TRANS_COMMIT\n");
142                         write((int)acceptfd, &control, sizeof(char));
143                         //TODO 
144                         //change ptr address in mhash table
145                         //unlock objects
146                         //update object version
147                         //change reference count of older address??
148                         //free space in objstr ??
149                         //Update location lookup table
150                         break;
151         }
152
153         if (close((int)acceptfd) == -1)
154         {
155                 perror("close");
156         }
157         else
158                 printf("Closed connection: fd = %d\n", (int)acceptfd);
159         
160         pthread_exit(NULL);
161 }
162
163 int readClientReq(int acceptfd) {
164         char *ptr, control;
165         void *modptr;
166         objheader_t *h, tmp_header;
167         fixed_data_t fixed;
168         int sum = 0, N, n;
169
170         // Read fixed_data
171         N = sizeof(fixed) - 1;
172         ptr = (char *)&fixed;;
173         fixed.control = TRANS_REQUEST;
174         do {
175                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
176         //      printf("DEBUG -> 1. Reading %d bytes \n", n);
177                 sum += n;
178         } while(sum < N && n != 0); 
179
180         //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
181         // Read list of mids
182         int mcount = fixed.mcount;
183         N = mcount * sizeof(unsigned int);
184         unsigned int listmid[mcount];
185         ptr = (char *) listmid;
186         sum = 0;
187         do {
188                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
189         //      printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
190                 sum += n;
191         } while(sum < N && n != 0);
192
193         // Read oid and version tuples
194         int numread = fixed.numread;
195         N = numread * (sizeof(unsigned int) + sizeof(short));
196         char objread[N];
197         sum = 0;
198         do {
199                 n = recv((int)acceptfd, (void *) objread, N, 0);
200         //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
201                 sum += n;
202         } while(sum < N && n != 0);
203         //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
204
205         // Read modified objects
206         if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
207         //      printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
208                 return 1;
209         }
210         sum = 0;
211         do {
212                 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
213                 //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
214                 sum += n;
215         } while (sum < fixed.sum_bytes && n != 0);
216         //Send control message as per all votes from the particpants
217         handleTransReq(acceptfd, &fixed, listmid, objread, modptr);
218
219         
220         
221         return 0;
222 }
223
224 //This function runs a decision after all objects are weighed under one of the 4 possibilities 
225 //and returns the appropriate control message to the Ccordinator 
226 int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) {
227         short version;
228         char control, *ptr;
229         int i;
230         unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread];
231         int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent
232         void *mobj;
233         objheader_t *headptr;
234         objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
235         
236         //Process each object present in the pile 
237         ptr = modptr;
238         //Process each oid in the machine pile/ group
239         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
240                 if (i < fixed->numread) {//Object is read
241                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
242                         incr *= i;
243                         oid = *((unsigned int *)(objread + incr));
244                         incr += sizeof(unsigned int);
245                         version = *((short *)(objread + incr));
246                 } else {//Obj is modified
247                         headptr = (objheader_t *) ptr;
248                         oid = headptr->oid;
249                         version = headptr->version;
250                         ptr += sizeof(objheader_t) + classsize[headptr->type];
251                 }
252                 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
253                 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
254                         objinfo[i].poss_val = OBJECT_NOT_FOUND;
255                         //Save the oids not found for later use
256                         oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
257                         objnotfound++;
258                 } else { // If obj found in machine (i.e. has not moved)
259                         //Check if obj is locked
260                         if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
261                                 if (version == ((objheader_t *)mobj)->version) {      // If version match
262                                         objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
263                                         v_matchlock++;
264                                 } else {//If versions don't match ..HARD ABORT
265                                         objinfo[i].poss_val = VERSION_NO_MATCH;
266                                         v_nomatch++;
267                                         //send TRANS_DISAGREE to Coordinator
268                                         control = TRANS_DISAGREE;
269                                         write(acceptfd, &control, sizeof(char));
270                                         //TODO when TRANS_DISAGREE is sent
271                                         //Free space allocated in main objstore
272                                         //Unlock objects that was locked in the trans
273                                         return 0;
274                                 }
275                         } else {//Obj is not locked , so lock object
276                                 ((objheader_t *)mobj)->status |= LOCK;
277                                 //Save all object oids that are locked on this machine during this transaction request call
278                                 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
279                                 objlocked++;
280                                 if (version == ((objheader_t *)mobj)->version) { //If versions match
281                                         objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
282                                         v_matchnolock++;
283                                 } else { //If versions don't match
284                                         objinfo[i].poss_val = VERSION_NO_MATCH;
285                                         v_nomatch++;
286                                         //send TRANS_DISAGREE to Coordinator
287                                         control = TRANS_DISAGREE;
288                                         write(acceptfd, &control, sizeof(char));
289                                         return 0;
290                                 }
291                         }
292                 }
293         }
294
295         //Decide what control message(s) to send
296         if(v_matchnolock == fixed->numread + fixed->nummod) {
297                 //send TRANS_AGREE to Coordinator
298                 control = TRANS_AGREE;
299                 write(acceptfd, &control, sizeof(char));
300         }
301         
302         if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
303                 //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
304                 control = TRANS_AGREE_BUT_MISSING_OBJECTS;
305                 write(acceptfd, &control, sizeof(char));
306                 //send missing oids  and number of oids not found with it
307                 write(acceptfd, &objnotfound, sizeof(int));
308                 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
309         }
310         
311         if(v_matchlock > 0 && v_nomatch == 0) {
312                 //send TRANS_SOFT_ABORT to Coordinator
313                 control = TRANS_SOFT_ABORT;
314                 write(acceptfd, &control, sizeof(char));
315                 //send missing oids  and number of oids not found with it
316                 write(acceptfd, &objnotfound, sizeof(int));
317                 write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
318         }
319         
320         //TODO when TRANS_DISAGREE is sent
321         //Free space allocated in main objstore
322         //Unlock objects that was locked in the trans
323         if(control == TRANS_DISAGREE) {
324                 for(i = 0; i< objlocked ; i++) {
325                         mobj = mhashSearch(oidlocked[i]);// find the header address
326                         ((objheader_t *)mobj)->status &= ~(LOCK);               
327                 }       
328         }       
329         return 0;
330 }