Buggy code, first packet lost during communication between server and client
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //todo:initialize main object store
22         //do we want this to be a global variable, or provide
23         //separate access funtions and hide the structure?
24         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
25         if (mhashCreate(HASH_SIZE, LOADFACTOR))
26                 return 1; //failure
27         
28         if (lhashCreate(HASH_SIZE, LOADFACTOR))
29                 return 1; //failure
30         
31         //pthread_t threadListen;
32         //pthread_create(&threadListen, NULL, dstmListen, NULL);
33         
34         return 0;
35 }
36
37 void *dstmListen()
38 {
39         int listenfd, acceptfd;
40         struct sockaddr_in my_addr;
41         struct sockaddr_in client_addr;
42         socklen_t addrlength = sizeof(struct sockaddr);
43         pthread_t thread_dstm_accept;
44         int i;
45
46         listenfd = socket(AF_INET, SOCK_STREAM, 0);
47         if (listenfd == -1)
48         {
49                 perror("socket");
50                 exit(1);
51         }
52
53         my_addr.sin_family = AF_INET;
54         my_addr.sin_port = htons(LISTEN_PORT);
55         my_addr.sin_addr.s_addr = INADDR_ANY;
56         memset(&(my_addr.sin_zero), '\0', 8);
57
58         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
59         {
60                 perror("bind");
61                 exit(1);
62         }
63         
64         if (listen(listenfd, BACKLOG) == -1)
65         {
66                 perror("listen");
67                 exit(1);
68         }
69
70         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
71         while(1)
72         {
73                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
74                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
75         }
76         pthread_exit(NULL);
77 }
78
79 void *dstmAccept(void *acceptfd)
80 {
81         int numbytes,i,choice, oid;
82         char buffer[RECEIVE_BUFFER_SIZE], control;
83         void *srcObj;
84         objheader_t *h;
85         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
86
87         printf("Recieved connection: fd = %d\n", (int)acceptfd);
88         while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0) 
89         {
90                 printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes);
91                 control = buffer[0];
92                 switch(control) {
93                         case READ_REQUEST:
94                                 printf("DEBUG -> READ_REQUEST\n");
95                                 oid = *((int *)(buffer+1));
96 #ifdef DEBUG1
97                                 printf("DEBUG -> Received oid is %d\n", oid);
98 #endif
99                                 srcObj = mhashSearch(oid);
100                                 h = (objheader_t *) srcObj;
101                                 if (h == NULL) {
102                                         buffer[0] = OBJECT_NOT_FOUND;
103                                 } else {
104                                         buffer[0] = OBJECT_FOUND;
105                                         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
106                                         memcpy(buffer+1, srcObj, size);
107                                 }
108 #ifdef DEBUG1
109                                 printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
110 #endif
111
112                                 if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
113                                         perror("");
114                                 }
115                                 break;
116                         case READ_MULT_REQUEST:
117                                 printf("DEBUG-> READ_MULT_REQUEST\n");
118                                 break;
119                         case MOVE_REQUEST:
120                                 printf("DEBUG -> MOVE_REQUEST\n");
121                                 break;
122                         case MOVE_MULT_REQUEST:
123                                 printf("DEBUG -> MOVE_MULT_REQUEST\n");
124                                 break;
125                         case TRANS_REQUEST:
126                                 printf("DEBUG -> TRANS_REQUEST\n");
127                                 printf("Client sent %d\n",buffer[0]);
128         //                      handleTransReq(acceptfd, buffer);
129                                 break;
130                         case TRANS_ABORT:
131                                 printf("DEBUG -> TRANS_ABORT\n");
132                                 break;
133                         case TRANS_COMMIT:
134                                 printf("DEBUG -> TRANS_COMMIT\n");
135                                 printf("Client sent %d\n",buffer[0]);
136                                 //TODO copy the objects into the machine 
137                                 /*copy the object into the object store from its old 
138                                   location in the objstore(pointer to its header is already stored before)*/
139                                 break;
140                         default:
141                                 printf("Error receiving");
142                 }
143                 //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
144                 //printf("%s", buffer);
145         } 
146         if (close((int)acceptfd) == -1)
147         {
148                 perror("close");
149         }
150         else
151                 printf("Closed connection: fd = %d\n", (int)acceptfd);
152         pthread_exit(NULL);
153 }
154
155 //TOOD put __FILE__ __LINE__ for all error conditions
156 #if 0
157 int handleTransReq(int acceptfd, char *buf) {
158         short numread = 0, nummod = 0;
159         char control;
160         int offset = 0, size,i;
161         int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
162         objheader_t *headptr = NULL;
163         objstr_t *tmpholder;
164         void *top, *mobj;
165         
166         char sendbuf[RECEIVE_BUFFER_SIZE];
167
168         control = buf[0];
169         offset = sizeof(fixed_data_t);
170         list = *((short *)(buf+offset));
171         offset += sizeof(short);
172         nummod = *((short *)(buf+offset));
173         offset += sizeof(short);
174         if (numread) {
175                 //Make an array to store the object headers for all objects that are only read
176                 if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
177                         perror("handleTransReq: Calloc error");
178                         return 1;
179                 }
180                 //Process each object id that is only read
181                 for (i = 0; i < numread; i++) {
182                         objheader_t *tmp;
183                         tmp = (objheader_t *) (buf + offset);
184                         //find if object is still present in the same machine since TRANS_REQUEST
185                         if ((mobj = mhashSearch(tmp->oid)) == NULL) {
186                                 objnotfound++;
187                                 /*
188                                 sendbuf[0] = OBJECT_NOT_FOUND;
189                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
190                                         perror("");
191                                 }
192                                 */
193                                 } else { // If obj found in machine (i.e. has not moved)
194                                 //Check if obj is locked
195                                 if ((((objheader_t *)mobj)->status >> 3) == 1) {
196                                         //Check version of the object
197                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
198                                                 transdis++;
199                                                 /*
200                                                 sendbuf[0] = TRANS_DISAGREE;
201                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
202                                                         perror("");
203                                                 }
204                                         */
205                                         } else {//If versions don't match ..HARD ABORT
206                                                 transabort++;
207                                                 /*
208                                                 sendbuf[0] = TRANS_DISAGREE_ABORT;
209                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
210                                                         perror("");
211                                                 }
212                                                 */
213                                         }
214                                 } else {// If object not locked then lock it
215                                         ((objheader_t *)mobj)->status |= LOCK;
216                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
217                                                 transagree++;
218                                                 /*
219                                                 sendbuf[0] = TRANS_AGREE;
220                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
221                                                         perror("");
222                                                 }
223                                                 */
224                                         } else {//If versions don't match
225                                                 transabort++;
226                                                 /*
227                                                 sendbuf[0] = TRANS_DISAGREE_ABORT;
228                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
229                                                         perror("");
230                                                 }
231                                                 */
232                                         }
233                                 }
234                         }       
235                         memcpy(headptr, buf+offset, sizeof(objheader_t));
236                         offset += sizeof(objheader_t);
237                 }
238         }
239         if (nummod) {
240                 if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
241                         perror("handleTransReq: Calloc error");
242                         return 1;
243                 }
244                 
245                 //Process each object id that is only modified
246                 for(i = 0; i < nummod; i++) {
247                         objheader_t *tmp;
248                         tmp = (objheader_t *)(buf + offset);
249                         //find if object is still present in the same machine since TRANS_REQUEST
250                         if ((mobj = mhashSearch(tmp->oid)) == NULL) {
251                                 objnotfound++;
252                                 /*
253                                    sendbuf[0] = OBJECT_NOT_FOUND;
254                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
255                                    perror("");
256                                    }
257                                  */
258                         } else { // If obj found in machine (i.e. has not moved)
259                                 //Check if obj is locked
260                                 if ((((objheader_t *)mobj)->status >> 3) == 1) {
261                                         //Check version of the object
262                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
263                                                 transdis++;
264                                                 /*
265                                                    sendbuf[0] = TRANS_DISAGREE;
266                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
267                                                    perror("");
268                                                    }
269                                                  */
270                                         } else {//If versions don't match ..HARD ABORT
271                                                 transabort++;
272                                                 /*
273                                                    sendbuf[0] = TRANS_DISAGREE_ABORT;
274                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
275                                                    perror("");
276                                                    }
277                                                  */
278                                         }
279                                 } else {// If object not locked then lock it
280                                         ((objheader_t *)mobj)->status |= LOCK;
281                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
282                                                 transagree++;
283                                                 /*
284                                                    sendbuf[0] = TRANS_AGREE;
285                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
286                                                    perror("");
287                                                    }
288                                                  */
289                                         } else {//If versions don't match
290                                                 transabort++;
291                                                 /*
292                                                    sendbuf[0] = TRANS_DISAGREE_ABORT;
293                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
294                                                    perror("");
295                                                    }
296                                                  */
297                                         }
298                                 }
299                         }       
300
301                         size = sizeof(objheader_t) + classsize[tmp->type];
302                         if ((top = objstrAlloc(tmpholder, size)) == NULL) {
303                                         perror("handleTransReq: Calloc error");
304                                         return 1;
305                         }
306                         memcpy(top, buf+offset, size);
307                         offset += size;
308                 }
309         }
310         /*
311         if(transabort > 0) {
312                 sendbuf[0] = TRANS_DISAGREE_ABORT;
313                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
314                         perror("");
315                 }
316         
317         } else if(transagree == numread+nummod) {
318                 sendbuf[0] = TRANS_AGREE;
319                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
320                         perror("");
321                 }
322         } else {
323                 sendbuf[0] = TRANS_DISAGREE;
324                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
325                         perror("");
326                 }
327         }
328         */
329         return 0;
330 }
331 #endif