Trans commit initial version
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include <netdb.h>
6 #include <fcntl.h>
7 #include "dstm.h"
8 #include "mlookup.h"
9 #include "llookup.h"
10
11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
14
15 extern int classsize[];
16
17 objstr_t *mainobjstore;
18
19 int dstmInit(void)
20 {
21         //todo:initialize main object store
22         //do we want this to be a global variable, or provide
23         //separate access funtions and hide the structure?
24         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
25         if (mhashCreate(HASH_SIZE, LOADFACTOR))
26                 return 1; //failure
27         
28         if (lhashCreate(HASH_SIZE, LOADFACTOR))
29                 return 1; //failure
30         
31         //pthread_t threadListen;
32         //pthread_create(&threadListen, NULL, dstmListen, NULL);
33         
34         return 0;
35 }
36
37 void *dstmListen()
38 {
39         int listenfd, acceptfd;
40         struct sockaddr_in my_addr;
41         struct sockaddr_in client_addr;
42         socklen_t addrlength = sizeof(struct sockaddr);
43         pthread_t thread_dstm_accept;
44         int i;
45
46         listenfd = socket(AF_INET, SOCK_STREAM, 0);
47         if (listenfd == -1)
48         {
49                 perror("socket");
50                 exit(1);
51         }
52
53         my_addr.sin_family = AF_INET;
54         my_addr.sin_port = htons(LISTEN_PORT);
55         my_addr.sin_addr.s_addr = INADDR_ANY;
56         memset(&(my_addr.sin_zero), '\0', 8);
57
58         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
59         {
60                 perror("bind");
61                 exit(1);
62         }
63         
64         if (listen(listenfd, BACKLOG) == -1)
65         {
66                 perror("listen");
67                 exit(1);
68         }
69
70         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
71         while(1)
72         {
73                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
74                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
75         }
76         pthread_exit(NULL);
77 }
78
79 void *dstmAccept(void *acceptfd)
80 {
81         int numbytes,i,choice, oid;
82         char buffer[RECEIVE_BUFFER_SIZE], control;
83         void *srcObj;
84         objheader_t *h;
85         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
86
87         printf("Recieved connection: fd = %d\n", (int)acceptfd);
88         numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
89         if (numbytes == -1)
90         {
91                 perror("recv");
92                 pthread_exit(NULL);
93         }
94         else
95         {
96                 control = buffer[0];
97                 switch(control) {
98                         case READ_REQUEST:
99                                 oid = *((int *)(buffer+1));
100 #ifdef DEBUG1
101                                 printf("DEBUG -> Received oid is %d\n", oid);
102 #endif
103                                 srcObj = mhashSearch(oid);
104                                 h = (objheader_t *) srcObj;
105                                 if (h == NULL) {
106                                         buffer[0] = OBJECT_NOT_FOUND;
107                                 } else {
108                                         buffer[0] = OBJECT_FOUND;
109                                         size = sizeof(objheader_t) + sizeof(classsize[h->type]);
110                                         memcpy(buffer+1, srcObj, size);
111                                 }
112 #ifdef DEBUG1
113                                 printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
114 #endif
115
116                                 if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
117                                         perror("");
118                                 }
119                                 break;
120                         case READ_MULT_REQUEST:
121                                 break;
122                         case MOVE_REQUEST:
123                                 break;
124                         case MOVE_MULT_REQUEST:
125                                 break;
126                         case TRANS_REQUEST:
127                                 printf("Client sent %d\n",buffer[0]);
128                                 int offset = 1;
129                                 printf("Num Read  %d\n",*((short*)(buffer+offset)));
130                                 offset += sizeof(short);
131                                 printf("Num modified  %d\n",*((short*)(buffer+offset)));
132                                 handleTransReq(acceptfd, buffer);
133                                 break;
134                         case TRANS_ABORT:
135                                 break;
136                         case TRANS_COMMIT:
137                                 break;
138                         default:
139                                 printf("Error receiving");
140                 }
141                 //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
142                 //printf("%s", buffer);
143         }
144         if (close((int)acceptfd) == -1)
145         {
146                 perror("close");
147         }
148         else
149                 printf("Closed connection: fd = %d\n", (int)acceptfd);
150         pthread_exit(NULL);
151 }
152
153 //TOOD put __FILE__ __LINE__ for all error conditions
154 int handleTransReq(int acceptfd, char *buf) {
155         short numread = 0, nummod = 0;
156         char control;
157         int offset = 0, size,i;
158         int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
159         objheader_t *headptr = NULL;
160         objstr_t *tmpholder;
161         void *top, *mobj;
162         char sendbuf[RECEIVE_BUFFER_SIZE];
163
164         control = buf[0];
165         offset = 1;
166         numread = *((short *)(buf+offset));
167         offset += sizeof(short);
168         nummod = *((short *)(buf+offset));
169         offset += sizeof(short);
170         if (numread) {
171                 //Make an array to store the object headers for all objects that are only read
172                 if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
173                         perror("handleTransReq: Calloc error");
174                         return 1;
175                 }
176                 //Process each object id that is only read
177                 for (i = 0; i < numread; i++) {
178                         objheader_t *tmp;
179                         tmp = (objheader_t *) (buf + offset);
180                         //find if object is still present in the same machine since TRANS_REQUEST
181                         if ((mobj = mhashSearch(tmp->oid)) == NULL) {
182                                 objnotfound++;
183                                 /*
184                                 sendbuf[0] = OBJECT_NOT_FOUND;
185                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
186                                         perror("");
187                                 }
188                                 */
189                                 } else { // If obj found in machine (i.e. has not moved)
190                                 //Check if obj is locked
191                                 if ((((objheader_t *)mobj)->status >> 3) == 1) {
192                                         //Check version of the object
193                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
194                                                 transdis++;
195                                                 /*
196                                                 sendbuf[0] = TRANS_DISAGREE;
197                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
198                                                         perror("");
199                                                 }
200                                         */
201                                         } else {//If versions don't match ..HARD ABORT
202                                                 transabort++;
203                                                 /*
204                                                 sendbuf[0] = TRANS_DISAGREE_ABORT;
205                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
206                                                         perror("");
207                                                 }
208                                                 */
209                                         }
210                                 } else {// If object not locked then lock it
211                                         ((objheader_t *)mobj)->status |= LOCK;
212                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
213                                                 transagree++;
214                                                 /*
215                                                 sendbuf[0] = TRANS_AGREE;
216                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
217                                                         perror("");
218                                                 }
219                                                 */
220                                         } else {//If versions don't match
221                                                 transabort++;
222                                                 /*
223                                                 sendbuf[0] = TRANS_DISAGREE_ABORT;
224                                                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
225                                                         perror("");
226                                                 }
227                                                 */
228                                         }
229                                 }
230                         }       
231                         memcpy(headptr, buf+offset, sizeof(objheader_t));
232                         offset += sizeof(objheader_t);
233                 }
234         }
235         if (nummod) {
236                 if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
237                         perror("handleTransReq: Calloc error");
238                         return 1;
239                 }
240                 
241                 //Process each object id that is only modified
242                 for(i = 0; i < nummod; i++) {
243                         objheader_t *tmp;
244                         tmp = (objheader_t *)(buf + offset);
245                         //find if object is still present in the same machine since TRANS_REQUEST
246                         if ((mobj = mhashSearch(tmp->oid)) == NULL) {
247                                 objnotfound++;
248                                 /*
249                                    sendbuf[0] = OBJECT_NOT_FOUND;
250                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
251                                    perror("");
252                                    }
253                                  */
254                         } else { // If obj found in machine (i.e. has not moved)
255                                 //Check if obj is locked
256                                 if ((((objheader_t *)mobj)->status >> 3) == 1) {
257                                         //Check version of the object
258                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
259                                                 transdis++;
260                                                 /*
261                                                    sendbuf[0] = TRANS_DISAGREE;
262                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
263                                                    perror("");
264                                                    }
265                                                  */
266                                         } else {//If versions don't match ..HARD ABORT
267                                                 transabort++;
268                                                 /*
269                                                    sendbuf[0] = TRANS_DISAGREE_ABORT;
270                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
271                                                    perror("");
272                                                    }
273                                                  */
274                                         }
275                                 } else {// If object not locked then lock it
276                                         ((objheader_t *)mobj)->status |= LOCK;
277                                         if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
278                                                 transagree++;
279                                                 /*
280                                                    sendbuf[0] = TRANS_AGREE;
281                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
282                                                    perror("");
283                                                    }
284                                                  */
285                                         } else {//If versions don't match
286                                                 transabort++;
287                                                 /*
288                                                    sendbuf[0] = TRANS_DISAGREE_ABORT;
289                                                    if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
290                                                    perror("");
291                                                    }
292                                                  */
293                                         }
294                                 }
295                         }       
296
297                         size = sizeof(objheader_t) + classsize[tmp->type];
298                         if ((top = objstrAlloc(tmpholder, size)) == NULL) {
299                                         perror("handleTransReq: Calloc error");
300                                         return 1;
301                         }
302                         memcpy(top, buf+offset, size);
303                         offset += size;
304                 }
305         }
306         if(transabort > 0) {
307                 sendbuf[0] = TRANS_DISAGREE_ABORT;
308                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
309                         perror("");
310                 }
311         
312         } else {
313                 sendbuf[0] = TRANS_AGREE;
314                 if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
315                         perror("");
316                 }
317         }
318         return 0;
319 }