4434ee002b91270cb0fba941d010467ec08d36d7
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <netdb.h>
9 #include <fcntl.h>
10 #include <errno.h>
11 #include <string.h>
12 #include "dstm.h"
13 #include "mlookup.h"
14 #include "llookup.h"
15 #include "threadnotify.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19
20
21 #define LISTEN_PORT 2156
22 #define BACKLOG 10 //max pending connections
23 #define RECEIVE_BUFFER_SIZE 2048
24 #define PRE_BUF_SIZE 2048
25
26 extern int classsize[];
27
28 objstr_t *mainobjstore;
29 pthread_mutex_t mainobjstore_mutex;
30 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
31 pthread_mutex_t threadnotify_mutex = PTHREAD_MUTEX_INITIALIZER;
32
33 /* This function initializes the main objects store and creates the 
34  * global machine and location lookup table */
35
36 int dstmInit(void)
37 {
38         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
39         /* Initialize attribute for mutex */
40         pthread_mutexattr_init(&mainobjstore_mutex_attr);
41         pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
42         pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
43         if (mhashCreate(HASH_SIZE, LOADFACTOR))
44                 return 1; //failure
45         
46         if (lhashCreate(HASH_SIZE, LOADFACTOR))
47                 return 1; //failure
48
49         if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
50                 return 1; //failure
51         
52         return 0;
53 }
54
55 /* This function starts the thread to listen on a socket 
56  * for tranaction calls */
57 void *dstmListen()
58 {
59         int listenfd, acceptfd;
60         struct sockaddr_in my_addr;
61         struct sockaddr_in client_addr;
62         socklen_t addrlength = sizeof(struct sockaddr);
63         pthread_t thread_dstm_accept;
64         int i;
65         int setsockflag=1;
66
67         listenfd = socket(AF_INET, SOCK_STREAM, 0);
68         if (listenfd == -1)
69         {
70                 perror("socket");
71                 exit(1);
72         }
73
74         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
75           perror("socket");
76           exit(1);
77         }
78 #ifdef MAC
79         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
80           perror("socket");
81           exit(1);
82         }
83 #endif
84
85         my_addr.sin_family = AF_INET;
86         my_addr.sin_port = htons(LISTEN_PORT);
87         my_addr.sin_addr.s_addr = INADDR_ANY;
88         memset(&(my_addr.sin_zero), '\0', 8);
89
90         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
91         {
92                 perror("bind");
93                 exit(1);
94         }
95         
96         if (listen(listenfd, BACKLOG) == -1)
97         {
98                 perror("listen");
99                 exit(1);
100         }
101
102         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
103         while(1)
104         {
105           int retval;
106           acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
107           do {
108             retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
109           } while(retval!=0);
110           pthread_detach(thread_dstm_accept);
111         }
112 }
113 /* This function accepts a new connection request, decodes the control message in the connection 
114  * and accordingly calls other functions to process new requests */
115 void *dstmAccept(void *acceptfd)
116 {
117         int val, retval, size;
118         unsigned int oid;
119         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
120         char *ptr;
121         void *srcObj;
122         objheader_t *h;
123         trans_commit_data_t transinfo;
124         unsigned short objType, *versionarry, version;
125         unsigned int *oidarry, numoid, mid, threadid;
126         
127         int i;
128
129         transinfo.objlocked = NULL;
130         transinfo.objnotfound = NULL;
131         transinfo.modptr = NULL;
132         transinfo.numlocked = 0;
133         transinfo.numnotfound = 0;
134
135         /* Receive control messages from other machines */
136         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
137                 perror("Error: in receiving control from coordinator\n");
138                 pthread_exit(NULL);
139         }
140         
141         switch(control) {
142                 case READ_REQUEST:
143                         /* Read oid requested and search if available */
144                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
145                                 perror("Error: receiving 0x0 object from cooridnator\n");
146                                 pthread_exit(NULL);
147                         }
148                         if((srcObj = mhashSearch(oid)) == NULL) {
149                                 printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
150                                 pthread_exit(NULL);
151                         }
152                         h = (objheader_t *) srcObj;
153                         GETSIZE(size, h);
154                         size += sizeof(objheader_t);
155
156                         if (h == NULL) {
157                                 ctrl = OBJECT_NOT_FOUND;
158                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
159                                         perror("Error sending control msg to coordinator\n");
160                                         pthread_exit(NULL);
161                                 }
162                         } else {
163                                 /* Type */
164                                 char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
165                                 *((int *)&msg[1])=size;
166                                 if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
167                                         perror("Error sending size of object to coordinator\n");
168                                         pthread_exit(NULL);
169                                 }
170                                 if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
171                                         perror("Error in sending object\n");
172                                         pthread_exit(NULL);
173                                 }
174                         }
175                         break;
176                 
177                 case READ_MULT_REQUEST:
178                         break;
179         
180                 case MOVE_REQUEST:
181                         break;
182
183                 case MOVE_MULT_REQUEST:
184                         break;
185
186                 case TRANS_REQUEST:
187                         /* Read transaction request */
188                         if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
189                                 printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
190                                 pthread_exit(NULL);
191                         }
192                         break;
193                 case TRANS_PREFETCH:
194                         if((val = prefetchReq((int)acceptfd)) != 0) {
195                                 printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
196                                 pthread_exit(NULL);
197                         }
198                         break;
199                 case START_REMOTE_THREAD:
200                         retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
201                         if (retval <= 0)
202                                 perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
203                         else if (retval != sizeof(unsigned int))
204                                 printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
205                                         retval, __FILE__, __LINE__);
206                         else
207                         {
208                                 objType = getObjType(oid);
209                                 startDSMthread(oid, objType);
210                         }
211                         break;
212
213                 case THREAD_NOTIFY_REQUEST:
214                         size = sizeof(unsigned int);
215                         bzero(&buffer, RECEIVE_BUFFER_SIZE);
216                         retval = recv((int)acceptfd, &buffer, size, 0);
217                         numoid = *((unsigned int *) &buffer);
218                         size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
219                         bzero(&buffer, RECEIVE_BUFFER_SIZE);
220                         retval = recv((int)acceptfd, &buffer, size, 0);
221                         if(retval <=0)
222                                 perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST");
223                         else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid))
224                                 printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval, 
225                                                 __FILE__, __LINE__);
226                         else {
227                                 oidarry = calloc(numoid, sizeof(unsigned int)); 
228                                 memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
229                                 size = sizeof(unsigned int) * numoid;
230                                 versionarry = calloc(numoid, sizeof(unsigned short));
231                                 memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
232                                 size += sizeof(unsigned short) * numoid;
233                                 mid = *((unsigned int *)(buffer+size));
234                                 size += sizeof(unsigned int);
235                                 threadid = *((unsigned int *)(buffer+size));
236                                 processReqNotify(numoid, oidarry, versionarry, mid, threadid);
237                         }
238
239                         break;
240
241                 case THREAD_NOTIFY_RESPONSE:
242                         size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
243                         bzero(&buffer, RECEIVE_BUFFER_SIZE);
244                         retval = recv((int)acceptfd, &buffer, size, 0);
245                         if(retval <= 0) 
246                                 perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE");
247                         else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
248                                 printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n", 
249                                                 retval, __FILE__, __LINE__);
250                         else {
251                                 oid = *((unsigned int *)buffer);
252                                 size = sizeof(unsigned int);
253                                 version = *((unsigned short *)(buffer+size));
254                                 size += sizeof(unsigned short);
255                                 threadid = *((unsigned int *)(buffer+size));
256                                 threadNotify(oid,version,threadid);
257                         }
258
259                         break;
260
261                 default:
262                         printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
263         }
264
265         /* Close connection */
266         if (close((int)acceptfd) == -1)
267                 perror("close");
268         
269         pthread_exit(NULL);
270 }
271
272 /* This function reads the information available in a transaction request
273  * and makes a function call to process the request */
274 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
275         char *ptr;
276         void *modptr;
277         unsigned int *oidmod, oid;
278         fixed_data_t fixed;
279         objheader_t *headaddr;
280         int sum = 0, i, N, n, val;
281
282         oidmod = NULL;
283
284         /* Read fixed_data_t data structure */ 
285         N = sizeof(fixed) - 1;
286         ptr = (char *)&fixed;;
287         fixed.control = TRANS_REQUEST;
288         do {
289                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
290                 sum += n;
291         } while(sum < N && n != 0); 
292
293         /* Read list of mids */
294         int mcount = fixed.mcount;
295         N = mcount * sizeof(unsigned int);
296         unsigned int listmid[mcount];
297         ptr = (char *) listmid;
298         sum = 0;
299         do {
300                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
301                 sum += n;
302         } while(sum < N && n != 0);
303
304         /* Read oid and version tuples for those objects that are not modified in the transaction */
305         int numread = fixed.numread;
306         N = numread * (sizeof(unsigned int) + sizeof(short));
307         char objread[N];
308         if(numread != 0) { //If pile contains more than one object to be read, 
309                           // keep reading all objects
310                 sum = 0;
311                 do {
312                         n = recv((int)acceptfd, (void *) objread, N, 0);
313                         sum += n;
314                 } while(sum < N && n != 0);
315         }
316         
317         /* Read modified objects */
318         if(fixed.nummod != 0) {
319                 if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
320                         printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
321                         return 1;
322                 }
323                 sum = 0;
324                 do { // Recv the objs that are modified by the Coordinator
325                         n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
326                         sum += n;
327                 } while (sum < fixed.sum_bytes && n != 0);
328         }
329
330         /* Create an array of oids for modified objects */
331         oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
332         if (oidmod == NULL)
333         {
334                 printf("calloc error %s, %d\n", __FILE__, __LINE__);
335                 return 1;
336         }
337         ptr = (char *) modptr;
338         for(i = 0 ; i < fixed.nummod; i++) {
339           int tmpsize;
340           headaddr = (objheader_t *) ptr;
341           oid = OID(headaddr);
342           oidmod[i] = oid;
343           GETSIZE(tmpsize, headaddr);
344           ptr += sizeof(objheader_t) + tmpsize;
345         }
346         
347         /*Process the information read */
348         if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
349                 printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
350                 /* Free resources */
351                 if(oidmod != NULL) {
352                         free(oidmod);
353                 }
354                 return 1;
355         }
356
357         /* Free resources */
358         if(oidmod != NULL) {
359                 free(oidmod);
360         }
361
362         return 0;
363 }
364
365 /* This function processes the Coordinator's transaction request using "handleTransReq" 
366  * function and sends a reply to the co-ordinator.
367  * Following this it also receives a new control message from the co-ordinator and processes this message*/
368 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
369                 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
370         char control, sendctrl;
371         objheader_t *tmp_header;
372         void *header;
373         int  i = 0, val, retval;
374
375         /* Send reply to the Coordinator */
376         if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
377                 printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
378                 return 1;
379         }
380
381         do {
382                 retval = recv((int)acceptfd, &control, sizeof(char), 0);
383         } while(retval < sizeof(char));
384
385         /* Process the new control message */
386         switch(control) {
387                 case TRANS_ABORT:
388                         if (fixed->nummod > 0)
389                                 free(modptr);
390                         /* Unlock objects that was locked due to this transaction */
391                         for(i = 0; i< transinfo->numlocked; i++) {
392                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
393                                 STATUS(((objheader_t *)header)) &= ~(LOCK);             
394                         }
395
396                         /* Send ack to Coordinator */
397                         sendctrl = TRANS_UNSUCESSFUL;
398                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
399                                 perror("Error: In sending ACK to coordinator\n");
400                                 if (transinfo->objlocked != NULL) {
401                                         free(transinfo->objlocked);
402                                 }
403                                 if (transinfo->objnotfound != NULL) {
404                                         free(transinfo->objnotfound);
405                                 }
406
407                                 return 1;
408                         }
409                         break;
410
411                 case TRANS_COMMIT:
412                         /* Invoke the transCommit process() */
413                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
414                                 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
415                                 /* Free memory */
416                                 if (transinfo->objlocked != NULL) {
417                                         free(transinfo->objlocked);
418                                 }
419                                 if (transinfo->objnotfound != NULL) {
420                                         free(transinfo->objnotfound);
421                                 }
422                                 return 1;
423                         }
424                         break;
425
426                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
427                         break;
428                 default:
429                         printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
430                         //TODO Use fixed.trans_id  TID since Client may have died
431                         break;
432         }
433
434         /* Free memory */
435         if (transinfo->objlocked != NULL) {
436                 free(transinfo->objlocked);
437         }
438         if (transinfo->objnotfound != NULL) {
439                 free(transinfo->objnotfound);
440         }
441
442         return 0;
443 }
444
445 /* This function increments counters while running a voting decision on all objects involved 
446  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
447 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
448         int val, i = 0;
449         unsigned short version;
450         char control = 0, *ptr;
451         unsigned int oid;
452         unsigned int *oidnotfound, *oidlocked;
453         void *mobj;
454         objheader_t *headptr;
455
456         /* Counters and arrays to formulate decision on control message to be sent */
457         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
458         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
459         int objnotfound = 0, objlocked = 0;
460         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
461
462         /* modptr points to the beginning of the object store 
463          * created at the Pariticipant. 
464          * Object store holds the modified objects involved in the transaction request */ 
465         ptr = (char *) modptr;
466         
467         /* Process each oid in the machine pile/ group per thread */
468         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
469                 if (i < fixed->numread) {//Objs only read and not modified
470                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
471                         incr *= i;
472                         oid = *((unsigned int *)(objread + incr));
473                         incr += sizeof(unsigned int);
474                         version = *((unsigned short *)(objread + incr));
475                 } else {//Objs modified
476                   int tmpsize;
477                   headptr = (objheader_t *) ptr;
478                   oid = OID(headptr);
479                   version = headptr->version;
480                   GETSIZE(tmpsize, headptr);
481                   ptr += sizeof(objheader_t) + tmpsize;
482                 }
483                 
484                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
485
486                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
487                         /* Save the oids not found and number of oids not found for later use */
488                         oidnotfound[objnotfound] = oid;
489                         objnotfound++;
490                 } else { /* If Obj found in machine (i.e. has not moved) */
491                         /* Check if Obj is locked by any previous transaction */
492                         if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
493                                 if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
494                                         v_matchlock++;
495                                 } else {/* If versions don't match ...HARD ABORT */
496                                         v_nomatch++;
497                                         /* Send TRANS_DISAGREE to Coordinator */
498                                         control = TRANS_DISAGREE;
499                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
500                                                 perror("Error in sending control to the Coordinator\n");
501                                                 return 0;
502                                         }
503                                         return control;
504                                 }
505                         } else {/* If Obj is not locked then lock object */
506                                 STATUS(((objheader_t *)mobj)) |= LOCK;
507                                 /* Save all object oids that are locked on this machine during this transaction request call */
508                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
509                                 objlocked++;
510                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
511                                         v_matchnolock++;
512                                 } else { /* If versions don't match ...HARD ABORT */
513                                         v_nomatch++;
514                                         control = TRANS_DISAGREE;
515                                         /* Send TRANS_DISAGREE to Coordinator */
516                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
517                                                 perror("Error in sending control to the Coordinator\n");
518                                                 return 0;
519                                         }
520                                         if (objlocked > 0) {
521                                                 STATUS(((objheader_t *)mobj)) &= ~(LOCK);
522                                                 free(oidlocked);
523                                         }
524                                         return control;
525                                 }
526                         }
527                 }
528         }
529         
530         /* Decide what control message to send to Coordinator */
531         if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
532                                         modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
533                 printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
534                 return 0;
535         }
536         
537         return val;
538
539 }
540 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
541  * to send to Coordinator based on the votes of oids involved in the transaction */
542 int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
543                 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
544                 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
545         int val;
546         char control = 0;
547         /* Condition to send TRANS_AGREE */
548         if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
549                 control = TRANS_AGREE;
550                 /* Send control message */
551                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
552                         perror("Error in sending control to Coordinator\n");
553                         return 0;
554                 }
555         }
556         /* Condition to send TRANS_SOFT_ABORT */
557         if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
558                 control = TRANS_SOFT_ABORT;
559                 char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
560                 *((int*)&msg[1])= *(objnotfound);
561
562                 /* Send control message */
563                 if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
564                         perror("Error in sending no of objects that are not found\n");
565                         return 0;
566                 }
567                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
568                 if(*(objnotfound) != 0) { 
569                         int size = sizeof(unsigned int)* *(objnotfound);
570                         if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
571                                 perror("Error in sending objects that are not found\n");
572                                 return 0;
573                         }
574                 }
575         }
576
577         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
578          * if Participant receives a TRANS_COMMIT */
579         transinfo->objlocked = oidlocked;
580         transinfo->objnotfound = oidnotfound;
581         transinfo->modptr = modptr;
582         transinfo->numlocked = *(objlocked);
583         transinfo->numnotfound = *(objnotfound);
584         
585         return control;
586 }
587
588 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
589  * addresses in lookup table and also changes version number
590  * Sends an ACK back to Coordinator */
591 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
592         objheader_t *header;
593         objheader_t *newheader;
594         int i = 0, offset = 0;
595         char control;
596         int tmpsize;
597
598         /* Process each modified object saved in the mainobject store */
599         for(i = 0; i < nummod; i++) {
600                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
601                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
602                         return 1;
603                 }
604                 GETSIZE(tmpsize,header);
605                 pthread_mutex_lock(&mainobjstore_mutex);
606                 memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
607                 header->version += 1; 
608                 /* If threads are waiting on this object to be updated, notify them */
609                 if(header->notifylist != NULL) {
610                         notifyAll(&header->notifylist, OID(header), header->version);
611                 }
612                 pthread_mutex_unlock(&mainobjstore_mutex);
613                 offset += sizeof(objheader_t) + tmpsize;
614         }
615
616         if (nummod > 0)
617                 free(modptr);
618
619         /* Unlock locked objects */
620         for(i = 0; i < numlocked; i++) {
621                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
622                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
623                         return 1;
624                 }
625                 STATUS(header) &= ~(LOCK);
626         }
627         //TODO Update location lookup table
628
629         /* Send ack to coordinator */
630         control = TRANS_SUCESSFUL;
631         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
632                 perror("Error sending ACK to coordinator\n");
633                 return 1;
634         }
635         
636         return 0;
637 }
638
639 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
640  * Looks for the objects to be prefetched in the main object store.
641  * If objects are not found then record those and if objects are found
642  * then use offset values to prefetch references to other objects */
643
644 int prefetchReq(int acceptfd) {
645         int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
646         int isArray = 0;
647         unsigned int oid, index = 0;
648         char *ptr, buffer[PRE_BUF_SIZE];
649         void *mobj;
650         unsigned int objoid;
651         char control;
652         objheader_t * header;
653         int bytesRecvd;
654 /*
655         unsigned int myIpAddr;
656
657 #ifdef MAC
658         myIpAddr = getMyIpAddr("en1");
659 #else
660         myIpAddr = getMyIpAddr("eth0");
661 #endif
662 */
663         /* Repeatedly recv the oid and offset pairs sent for prefetch */
664         while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
665                 count++;
666                 if(length == -1)
667                         break;
668                 index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
669                 // first 4 bytes are saved to send the
670                 // size of the buffer (that is computed at the end of the loop)
671                 bytesRecvd = 0;
672                 do {
673                         bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
674                                         sizeof(unsigned int) - bytesRecvd, 0);
675                 } while (bytesRecvd < sizeof(unsigned int));
676                 numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
677                 N = numoffset * sizeof(short);
678                 short offset[numoffset];
679                 ptr = (char *)&offset;
680                 sum = 0;
681                 /* Recv the offset values per oid */ 
682                 do {
683                         n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
684                         sum += n; 
685                 } while(sum < N && n != 0);     
686
687                 /* Process each oid */
688                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
689                         /* Save the oids not found in buffer for later use */
690                         *(buffer + index) = OBJECT_NOT_FOUND;
691                         index += sizeof(char);
692                         *((unsigned int *)(buffer+index)) = oid;
693                         index += sizeof(unsigned int);
694                 } else { /* If Obj found in machine (i.e. has not moved) */
695                         /* send the oid, it's size, it's header and data */
696                         header = (objheader_t *)mobj;
697                         GETSIZE(size, header);
698                         size += sizeof(objheader_t);
699                         *(buffer + index) = OBJECT_FOUND;
700                         index += sizeof(char);
701                         *((unsigned int *)(buffer+index)) = oid;
702                         index += sizeof(unsigned int);
703                         *((int *)(buffer+index)) = size;
704                         index += sizeof(int);
705                         memcpy(buffer + index, header, size);
706                         index += size;
707                         /* Calculate the oid corresponding to the offset value */
708                         for(i = 0 ; i< numoffset ; i++) {
709                                 /* Check for arrays  */
710                                 if(TYPE(header) > NUMCLASSES) {
711                                         isArray = 1;
712                                 }
713                                 if(isArray == 1) {
714                                         int elementsize = classsize[TYPE(header)];
715                                         objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
716                                 } else {
717                                         objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
718                                 }
719                                 if((header = mhashSearch(objoid)) == NULL) {
720                                         /* Obj not found, send oid */
721                                         *(buffer + index) = OBJECT_NOT_FOUND;
722                                         index += sizeof(char);
723                                         *((unsigned int *)(buffer+index)) = objoid;
724                                         index += sizeof(unsigned int);
725                                         break;
726                                 } else {/* Obj Found */
727                                         /* send the oid, it's size, it's header and data */
728                                         GETSIZE(size, header);
729                                         size+=sizeof(objheader_t);
730                                         *(buffer+index) = OBJECT_FOUND;
731                                         index += sizeof(char);
732                                         *((unsigned int *)(buffer+index)) = objoid;
733                                         index += sizeof(unsigned int);
734                                         *((int *)(buffer+index)) = size;
735                                         index += sizeof(int);
736                                         memcpy(buffer+index, header, size);
737                                         index += size;
738                                         isArray = 0;
739                                         continue;
740                                 }
741                         }
742                 }
743                 /* Check for overflow in the buffer */
744                 if (index >= PRE_BUF_SIZE) {
745                         printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
746                         return 1;
747                 }
748                 /* Send Prefetch response control message only once*/
749                 if(count == 1){
750                         control = TRANS_PREFETCH_RESPONSE;
751                         if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
752                                 perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
753                                 return 1;
754                         }
755                 }
756
757                 /* Add the buffer size into buffer as a parameter */
758                 *((unsigned int *)buffer)=index;
759
760                 /* Send the entire buffer with its size and oids found and not found */
761                 if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
762                         perror("Error: sending oids found\n");
763                         return 1;
764                 }
765         }
766         return 0;
767 }
768
769 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
770         objheader_t *header;
771         unsigned int oid;
772         unsigned short newversion;
773         char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
774         int sd;
775         struct sockaddr_in remoteAddr;
776         int bytesSent;
777         int status, size;
778
779         int i = 0;
780         while(i < numoid) {
781                 oid = *(oidarry + i);
782                 if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
783                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
784                         return;
785                 } else {
786                         /* Check to see if versions are same */
787 checkversion:
788                         if ((STATUS(header) & LOCK) != LOCK) {          
789                                 STATUS(header) |= LOCK;
790                                 newversion = header->version;
791                                 if(newversion == *(versionarry + i)) {
792                                         //Add to the notify list 
793                                         if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
794                                                 printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
795                                                 return;
796                                         }
797                                         STATUS(header) &= ~(LOCK);              
798                                 } else {
799                                         STATUS(header) &= ~(LOCK);              
800                                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
801                                                 perror("processReqNotify():socket()");
802                                                 return;
803                                         }
804                                         bzero(&remoteAddr, sizeof(remoteAddr));
805                                         remoteAddr.sin_family = AF_INET;
806                                         remoteAddr.sin_port = htons(LISTEN_PORT);
807                                         remoteAddr.sin_addr.s_addr = htonl(mid);
808
809                                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
810                                                 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
811                                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
812                                                 status = -1;
813                                         } else {
814                                                 //Send Update notification
815                                                 msg[0] = THREAD_NOTIFY_RESPONSE;
816                                                 *((unsigned int *)&msg[1]) = oid;
817                                                 size = sizeof(unsigned int);
818                                                 *((unsigned short *)(&msg[1]+size)) = newversion;
819                                                 size += sizeof(unsigned short);
820                                                 *((unsigned int *)(&msg[1]+size)) = threadid;
821                                                 bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
822                                                 if (bytesSent < 0){
823                                                         perror("processReqNotify():send()");
824                                                         status = -1;
825                                                 } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
826                                                         printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", 
827                                                                         bytesSent, __FILE__, __LINE__);
828                                                         status = -1;
829                                                 } else {
830                                                         status = 0;
831                                                 }
832
833                                         }
834                                         close(sd);
835                                 }
836                         } else {
837                                 randomdelay();
838                                 goto checkversion;
839                         }
840                 }
841                 i++;
842         }
843         free(oidarry);
844         free(versionarry);
845 }
846