bug fixes for evil garbage collection bugs...
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <netdb.h>
9 #include <fcntl.h>
10 #include <errno.h>
11 #include <string.h>
12 #include "dstm.h"
13 #include "mlookup.h"
14 #include "llookup.h"
15 #include "threadnotify.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19
20
21 #define LISTEN_PORT 2156
22 #define BACKLOG 10 //max pending connections
23 #define RECEIVE_BUFFER_SIZE 2048
24 #define PRE_BUF_SIZE 2048
25
26 extern int classsize[];
27
28 objstr_t *mainobjstore;
29 pthread_mutex_t mainobjstore_mutex;
30 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
31 pthread_mutex_t threadnotify_mutex = PTHREAD_MUTEX_INITIALIZER;
32
33 /* This function initializes the main objects store and creates the 
34  * global machine and location lookup table */
35
36 int dstmInit(void)
37 {
38         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
39         /* Initialize attribute for mutex */
40         pthread_mutexattr_init(&mainobjstore_mutex_attr);
41         pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
42         pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
43         if (mhashCreate(HASH_SIZE, LOADFACTOR))
44                 return 1; //failure
45         
46         if (lhashCreate(HASH_SIZE, LOADFACTOR))
47                 return 1; //failure
48
49         if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
50                 return 1; //failure
51         
52         return 0;
53 }
54
55 /* This function starts the thread to listen on a socket 
56  * for tranaction calls */
57 void *dstmListen()
58 {
59         int listenfd, acceptfd;
60         struct sockaddr_in my_addr;
61         struct sockaddr_in client_addr;
62         socklen_t addrlength = sizeof(struct sockaddr);
63         pthread_t thread_dstm_accept;
64         int i;
65         int setsockflag=1;
66
67         listenfd = socket(AF_INET, SOCK_STREAM, 0);
68         if (listenfd == -1)
69         {
70                 perror("socket");
71                 exit(1);
72         }
73
74         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
75           perror("socket");
76           exit(1);
77         }
78 #ifdef MAC
79         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
80           perror("socket");
81           exit(1);
82         }
83 #endif
84
85         my_addr.sin_family = AF_INET;
86         my_addr.sin_port = htons(LISTEN_PORT);
87         my_addr.sin_addr.s_addr = INADDR_ANY;
88         memset(&(my_addr.sin_zero), '\0', 8);
89
90         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
91         {
92                 perror("bind");
93                 exit(1);
94         }
95         
96         if (listen(listenfd, BACKLOG) == -1)
97         {
98                 perror("listen");
99                 exit(1);
100         }
101
102         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
103         while(1)
104         {
105           int retval;
106           acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
107           do {
108             retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
109           } while(retval!=0);
110           pthread_detach(thread_dstm_accept);
111         }
112 }
113 /* This function accepts a new connection request, decodes the control message in the connection 
114  * and accordingly calls other functions to process new requests */
115 void *dstmAccept(void *acceptfd)
116 {
117         int val, retval, size;
118         unsigned int oid;
119         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
120         char *ptr;
121         void *srcObj;
122         objheader_t *h;
123         trans_commit_data_t transinfo;
124         unsigned short objType, *versionarry, version;
125         unsigned int *oidarry, numoid, mid, threadid;
126         
127         int i;
128
129         transinfo.objlocked = NULL;
130         transinfo.objnotfound = NULL;
131         transinfo.modptr = NULL;
132         transinfo.numlocked = 0;
133         transinfo.numnotfound = 0;
134
135         /* Receive control messages from other machines */
136         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
137                 perror("Error: in receiving control from coordinator\n");
138                 pthread_exit(NULL);
139         }
140         
141         switch(control) {
142                 case READ_REQUEST:
143                         /* Read oid requested and search if available */
144                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
145                                 perror("Error: receiving 0x0 object from cooridnator\n");
146                                 pthread_exit(NULL);
147                         }
148                         if((srcObj = mhashSearch(oid)) == NULL) {
149                                 printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
150                                 pthread_exit(NULL);
151                         }
152                         h = (objheader_t *) srcObj;
153                         GETSIZE(size, h);
154                         size += sizeof(objheader_t);
155
156                         if (h == NULL) {
157                                 ctrl = OBJECT_NOT_FOUND;
158                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
159                                         perror("Error sending control msg to coordinator\n");
160                                         pthread_exit(NULL);
161                                 }
162                         } else {
163                                 /* Type */
164                                 char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
165                                 *((int *)&msg[1])=size;
166                                 if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
167                                         perror("Error sending size of object to coordinator\n");
168                                         pthread_exit(NULL);
169                                 }
170                                 if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
171                                         perror("Error in sending object\n");
172                                         pthread_exit(NULL);
173                                 }
174                         }
175                         break;
176                 
177                 case READ_MULT_REQUEST:
178                         break;
179         
180                 case MOVE_REQUEST:
181                         break;
182
183                 case MOVE_MULT_REQUEST:
184                         break;
185
186                 case TRANS_REQUEST:
187                         /* Read transaction request */
188                         if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
189                                 printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
190                                 pthread_exit(NULL);
191                         }
192                         break;
193                 case TRANS_PREFETCH:
194                         if((val = prefetchReq((int)acceptfd)) != 0) {
195                                 printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
196                                 pthread_exit(NULL);
197                         }
198                         break;
199                 case START_REMOTE_THREAD:
200                         retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
201                         if (retval <= 0)
202                                 perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
203                         else if (retval != sizeof(unsigned int))
204                                 printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
205                                         retval, __FILE__, __LINE__);
206                         else
207                         {
208                                 objType = getObjType(oid);
209                                 startDSMthread(oid, objType);
210                         }
211                         break;
212
213                 case THREAD_NOTIFY_REQUEST:
214                         size = sizeof(unsigned int);
215                         bzero(&buffer, RECEIVE_BUFFER_SIZE);
216                         retval = recv((int)acceptfd, &buffer, size, 0);
217                         numoid = *((unsigned int *) &buffer);
218                         size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
219                         bzero(&buffer, RECEIVE_BUFFER_SIZE);
220                         retval = recv((int)acceptfd, &buffer, size, 0);
221                         if(retval <=0)
222                                 perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST");
223                         else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid))
224                                 printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval, 
225                                                 __FILE__, __LINE__);
226                         else {
227                                 oidarry = calloc(numoid, sizeof(unsigned int)); 
228                                 memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
229                                 size = sizeof(unsigned int) * numoid;
230                                 versionarry = calloc(numoid, sizeof(unsigned short));
231                                 memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
232                                 size += sizeof(unsigned short) * numoid;
233                                 mid = *((unsigned int *)(buffer+size));
234                                 size += sizeof(unsigned int);
235                                 threadid = *((unsigned int *)(buffer+size));
236                                 processReqNotify(numoid, oidarry, versionarry, mid, threadid);
237                         }
238
239                         break;
240
241                 case THREAD_NOTIFY_RESPONSE:
242                         size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
243                         bzero(&buffer, RECEIVE_BUFFER_SIZE);
244                         retval = recv((int)acceptfd, &buffer, size, 0);
245                         if(retval <= 0) 
246                                 perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE");
247                         else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
248                                 printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n", 
249                                                 retval, __FILE__, __LINE__);
250                         else {
251                                 oid = *((unsigned int *)buffer);
252                                 size = sizeof(unsigned int);
253                                 version = *((unsigned short *)(buffer+size));
254                                 size += sizeof(unsigned short);
255                                 threadid = *((unsigned int *)(buffer+size));
256                                 threadNotify(oid,version,threadid);
257                         }
258
259                         break;
260
261                 default:
262                         printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
263         }
264
265         /* Close connection */
266         if (close((int)acceptfd) == -1)
267                 perror("close");
268         
269         pthread_exit(NULL);
270 }
271
272 /* This function reads the information available in a transaction request
273  * and makes a function call to process the request */
274 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
275         char *ptr;
276         void *modptr;
277         unsigned int *oidmod, oid;
278         fixed_data_t fixed;
279         objheader_t *headaddr;
280         int sum = 0, i, N, n, val;
281
282         oidmod = NULL;
283
284         /* Read fixed_data_t data structure */ 
285         N = sizeof(fixed) - 1;
286         ptr = (char *)&fixed;;
287         fixed.control = TRANS_REQUEST;
288         do {
289                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
290                 sum += n;
291         } while(sum < N && n != 0); 
292
293         /* Read list of mids */
294         int mcount = fixed.mcount;
295         N = mcount * sizeof(unsigned int);
296         unsigned int listmid[mcount];
297         ptr = (char *) listmid;
298         sum = 0;
299         do {
300                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
301                 sum += n;
302         } while(sum < N && n != 0);
303
304         /* Read oid and version tuples for those objects that are not modified in the transaction */
305         int numread = fixed.numread;
306         N = numread * (sizeof(unsigned int) + sizeof(short));
307         char objread[N];
308         if(numread != 0) { //If pile contains more than one object to be read, 
309                           // keep reading all objects
310                 sum = 0;
311                 do {
312                         n = recv((int)acceptfd, (void *) objread, N, 0);
313                         sum += n;
314                 } while(sum < N && n != 0);
315         }
316         
317         /* Read modified objects */
318         if(fixed.nummod != 0) {
319                 if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
320                         printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
321                         return 1;
322                 }
323                 sum = 0;
324                 do { // Recv the objs that are modified by the Coordinator
325                         n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
326                         sum += n;
327                 } while (sum < fixed.sum_bytes && n != 0);
328         }
329
330         /* Create an array of oids for modified objects */
331         oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
332         if (oidmod == NULL)
333         {
334                 printf("calloc error %s, %d\n", __FILE__, __LINE__);
335                 return 1;
336         }
337         ptr = (char *) modptr;
338         for(i = 0 ; i < fixed.nummod; i++) {
339           int tmpsize;
340           headaddr = (objheader_t *) ptr;
341           oid = OID(headaddr);
342           oidmod[i] = oid;
343           GETSIZE(tmpsize, headaddr);
344           ptr += sizeof(objheader_t) + tmpsize;
345         }
346         
347         /*Process the information read */
348         if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
349                 printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
350                 /* Free resources */
351                 if(oidmod != NULL) {
352                         free(oidmod);
353                 }
354                 return 1;
355         }
356
357         /* Free resources */
358         if(oidmod != NULL) {
359                 free(oidmod);
360         }
361
362         return 0;
363 }
364
365 /* This function processes the Coordinator's transaction request using "handleTransReq" 
366  * function and sends a reply to the co-ordinator.
367  * Following this it also receives a new control message from the co-ordinator and processes this message*/
368 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
369                 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
370         char control, sendctrl, retval;
371         objheader_t *tmp_header;
372         void *header;
373         int  i = 0, val;
374
375         /* Send reply to the Coordinator */
376         if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
377                 printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
378                 return 1;
379         }
380
381         do {
382                 retval = recv((int)acceptfd, &control, sizeof(char), 0);
383         } while(retval < sizeof(char));
384
385         /* Process the new control message */
386         switch(control) {
387                 case TRANS_ABORT:
388                         if (fixed->nummod > 0)
389                                 free(modptr);
390                         /* Unlock objects that was locked due to this transaction */
391                         for(i = 0; i< transinfo->numlocked; i++) {
392                                 if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
393                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
394                                         return 1;
395                                 }
396                                 STATUS(((objheader_t *)header)) &= ~(LOCK);             
397                         }
398
399                         /* Send ack to Coordinator */
400                         sendctrl = TRANS_UNSUCESSFUL;
401                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
402                                 perror("Error: In sending ACK to coordinator\n");
403                                 if (transinfo->objlocked != NULL) {
404                                         free(transinfo->objlocked);
405                                 }
406                                 if (transinfo->objnotfound != NULL) {
407                                         free(transinfo->objnotfound);
408                                 }
409
410                                 return 1;
411                         }
412                         break;
413
414                 case TRANS_COMMIT:
415                         /* Invoke the transCommit process() */
416                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
417                                 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
418                                 /* Free memory */
419                                 if (transinfo->objlocked != NULL) {
420                                         free(transinfo->objlocked);
421                                 }
422                                 if (transinfo->objnotfound != NULL) {
423                                         free(transinfo->objnotfound);
424                                 }
425                                 return 1;
426                         }
427                         break;
428
429                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
430                         break;
431                 default:
432                         printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
433                         //TODO Use fixed.trans_id  TID since Client may have died
434                         break;
435         }
436
437         /* Free memory */
438         if (transinfo->objlocked != NULL) {
439                 free(transinfo->objlocked);
440         }
441         if (transinfo->objnotfound != NULL) {
442                 free(transinfo->objnotfound);
443         }
444
445         return 0;
446 }
447
448 /* This function increments counters while running a voting decision on all objects involved 
449  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
450 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
451         int val, i = 0, j;
452         unsigned short version;
453         char control = 0, *ptr;
454         unsigned int oid;
455         unsigned int *oidnotfound, *oidlocked;
456         void *mobj;
457         objheader_t *headptr;
458
459         /* Counters and arrays to formulate decision on control message to be sent */
460         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
461         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
462         int objnotfound = 0, objlocked = 0;
463         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
464
465         /* modptr points to the beginning of the object store 
466          * created at the Pariticipant. 
467          * Object store holds the modified objects involved in the transaction request */ 
468         ptr = (char *) modptr;
469         
470         /* Process each oid in the machine pile/ group per thread */
471         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
472                 if (i < fixed->numread) {//Objs only read and not modified
473                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
474                         incr *= i;
475                         oid = *((unsigned int *)(objread + incr));
476                         incr += sizeof(unsigned int);
477                         version = *((unsigned short *)(objread + incr));
478                 } else {//Objs modified
479                   int tmpsize;
480                   headptr = (objheader_t *) ptr;
481                   oid = OID(headptr);
482                   version = headptr->version;
483                   GETSIZE(tmpsize, headptr);
484                   ptr += sizeof(objheader_t) + tmpsize;
485                 }
486                 
487                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
488
489                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
490                         /* Save the oids not found and number of oids not found for later use */
491                         oidnotfound[objnotfound] = oid;
492                         objnotfound++;
493                 } else { /* If Obj found in machine (i.e. has not moved) */
494                         /* Check if Obj is locked by any previous transaction */
495                         if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
496                                 if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
497                                         v_matchlock++;
498                                 } else {/* If versions don't match ...HARD ABORT */
499                                         v_nomatch++;
500                                         /* Send TRANS_DISAGREE to Coordinator */
501                                         control = TRANS_DISAGREE;
502                                         if (objlocked > 0) {
503                                           for(j = 0; j < objlocked; j++) {
504                                                         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
505                                                                 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
506                                                                 return 0;
507                                                         }
508                                                         STATUS(headptr) &= ~(LOCK);
509                                                 }
510                                                 free(oidlocked);
511                                         }
512                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
513                                                 perror("Error in sending control to the Coordinator\n");
514                                                 return 0;
515                                         }
516                                         return control;
517                                 }
518                         } else {/* If Obj is not locked then lock object */
519                                 STATUS(((objheader_t *)mobj)) |= LOCK;
520                                 /* Save all object oids that are locked on this machine during this transaction request call */
521                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
522                                 objlocked++;
523                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
524                                         v_matchnolock++;
525                                 } else { /* If versions don't match ...HARD ABORT */
526                                         v_nomatch++;
527                                         control = TRANS_DISAGREE;
528                                         if (objlocked > 0) {
529                                                 for(j = 0; j < objlocked; j++) {
530                                                         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
531                                                                 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
532                                                                 return 0;
533                                                         }
534                                                         STATUS(headptr) &= ~(LOCK);
535                                                 }
536                                                 free(oidlocked);
537                                         }
538
539                                         /* Send TRANS_DISAGREE to Coordinator */
540                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
541                                                 perror("Error in sending control to the Coordinator\n");
542                                                 return 0;
543                                         }
544                                         
545                                         return control;
546                                 }
547                         }
548                 }
549         }
550         
551         /* Decide what control message to send to Coordinator */
552         if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
553                                         modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
554                 printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
555                 return 0;
556         }
557         
558         return control;
559
560 }
561 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
562  * to send to Coordinator based on the votes of oids involved in the transaction */
563 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
564                 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
565                 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
566         int val;
567         char control = 0;
568
569         /* Condition to send TRANS_AGREE */
570         if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
571                 control = TRANS_AGREE;
572                 /* Send control message */
573                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
574                         perror("Error in sending control to Coordinator\n");
575                         return 0;
576                 }
577         }
578         /* Condition to send TRANS_SOFT_ABORT */
579         if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
580                 control = TRANS_SOFT_ABORT;
581
582                 /* Send control message */
583                 if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
584                         perror("Error in sending TRANS_SOFT_ABORT control\n");
585                         return 0;
586                 }
587
588                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
589                 if(*(objnotfound) != 0) { 
590                         int msg[1];
591                         msg[0] = *(objnotfound);
592                         if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
593                                 perror("Error in sending objects that are not found\n");
594                                 return 0;
595                         }
596                         int size = sizeof(unsigned int)* *(objnotfound);
597                         if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
598                                 perror("Error in sending objects that are not found\n");
599                                 return 0;
600                         }
601                 }
602         }
603
604         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
605          * if Participant receives a TRANS_COMMIT */
606         transinfo->objlocked = oidlocked;
607         transinfo->objnotfound = oidnotfound;
608         transinfo->modptr = modptr;
609         transinfo->numlocked = *(objlocked);
610         transinfo->numnotfound = *(objnotfound);
611         
612         return control;
613 }
614
615 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
616  * addresses in lookup table and also changes version number
617  * Sends an ACK back to Coordinator */
618 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
619         objheader_t *header;
620         objheader_t *newheader;
621         int i = 0, offset = 0;
622         char control;
623         int tmpsize;
624
625         /* Process each modified object saved in the mainobject store */
626         for(i = 0; i < nummod; i++) {
627                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
628                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
629                         return 1;
630                 }
631                 GETSIZE(tmpsize,header);
632                 pthread_mutex_lock(&mainobjstore_mutex);
633                 memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
634                 header->version += 1; 
635                 /* If threads are waiting on this object to be updated, notify them */
636                 if(header->notifylist != NULL) {
637                         notifyAll(&header->notifylist, OID(header), header->version);
638                 }
639                 pthread_mutex_unlock(&mainobjstore_mutex);
640                 offset += sizeof(objheader_t) + tmpsize;
641         }
642
643         if (nummod > 0)
644                 free(modptr);
645
646         /* Unlock locked objects */
647         for(i = 0; i < numlocked; i++) {
648                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
649                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
650                         return 1;
651                 }
652                 STATUS(header) &= ~(LOCK);
653         }
654         //TODO Update location lookup table
655
656         /* Send ack to coordinator */
657         control = TRANS_SUCESSFUL;
658         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
659                 perror("Error sending ACK to coordinator\n");
660                 return 1;
661         }
662         
663         return 0;
664 }
665
666 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
667  * Looks for the objects to be prefetched in the main object store.
668  * If objects are not found then record those and if objects are found
669  * then use offset values to prefetch references to other objects */
670
671 int prefetchReq(int acceptfd) {
672         int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
673         int isArray = 0;
674         unsigned int oid, index = 0;
675         char *ptr, buffer[PRE_BUF_SIZE];
676         void *mobj;
677         unsigned int objoid;
678         char control;
679         objheader_t * header;
680         int bytesRecvd;
681
682         /* Repeatedly recv one oid and offset pair sent for prefetch */
683         while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
684                 count++;
685                 if(length == -1)
686                         break;
687                 index = 0;  
688                 bytesRecvd = 0;
689                 do {
690                         bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
691                                         sizeof(unsigned int) - bytesRecvd, 0);
692                 } while (bytesRecvd < sizeof(unsigned int));
693                 numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
694                 N = numoffset * sizeof(short);
695                 short offset[numoffset];
696                 ptr = (char *)&offset;
697                 sum = 0;
698                 /* Recv the offset values per oid */ 
699                 do {
700                         n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
701                         sum += n; 
702                 } while(sum < N && n != 0);     
703
704                 bzero(&buffer, PRE_BUF_SIZE);
705                 /* Process each oid */
706                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
707                         /* Save the oids not found in buffer for later use */
708                         *(buffer + index) = OBJECT_NOT_FOUND;
709                         index += sizeof(char);
710                         *((unsigned int *)(buffer+index)) = oid;
711                         index += sizeof(unsigned int);
712                 } else { /* If Obj found in machine (i.e. has not moved) */
713                         /* send the oid, it's size, it's header and data */
714                         header = (objheader_t *)mobj;
715                         GETSIZE(size, header);
716                         size += sizeof(objheader_t);
717                         *(buffer + index) = OBJECT_FOUND;
718                         index += sizeof(char);
719                         *((unsigned int *)(buffer+index)) = oid;
720                         index += sizeof(unsigned int);
721                         *((int *)(buffer+index)) = size;
722                         index += sizeof(int);
723                         memcpy(buffer + index, header, size);
724                         index += size;
725                         /* Calculate the oid corresponding to the offset value */
726                         for(i = 0 ; i< numoffset ; i++) {
727                                 /* Check for arrays  */
728                                 if(TYPE(header) > NUMCLASSES) {
729                                         isArray = 1;
730                                 }
731                                 if(isArray == 1) {
732                                         int elementsize = classsize[TYPE(header)];
733                                         objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
734                                 } else {
735                                         objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
736                                 }
737                                 if((header = mhashSearch(objoid)) == NULL) {
738                                         /* Obj not found, send oid */
739                                         *(buffer + index) = OBJECT_NOT_FOUND;
740                                         index += sizeof(char);
741                                         *((unsigned int *)(buffer+index)) = objoid;
742                                         index += sizeof(unsigned int);
743                                         break;
744                                 } else {/* Obj Found */
745                                         /* send the oid, it's size, it's header and data */
746                                         GETSIZE(size, header);
747                                         size+=sizeof(objheader_t);
748                                         *(buffer+index) = OBJECT_FOUND;
749                                         index += sizeof(char);
750                                         *((unsigned int *)(buffer+index)) = objoid;
751                                         index += sizeof(unsigned int);
752                                         *((int *)(buffer+index)) = size;
753                                         index += sizeof(int);
754                                         memcpy(buffer+index, header, size);
755                                         index += size;
756                                         isArray = 0;
757                                         continue;
758                                 }
759                         }
760                 }
761
762                 /* Check for overflow in the buffer */
763                 if (index >= PRE_BUF_SIZE) {
764                         printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
765                         return 1;
766                 }
767                 /* Send Prefetch response control message only once*/
768                 if(count == 1){
769                         control = TRANS_PREFETCH_RESPONSE;
770                         if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
771                                 perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
772                                 return 1;
773                         }
774                 }
775
776                 //Send buffer size 
777                 if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) {
778                         perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
779                         return 1;
780                 }
781
782                 /* Send the entire buffer with its size and oids found and not found */
783                 if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) {
784                         perror("Error: sending oids found\n");
785                         return 1;
786                 }
787         }
788         return 0;
789 }
790
791 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
792         objheader_t *header;
793         unsigned int oid;
794         unsigned short newversion;
795         char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
796         int sd;
797         struct sockaddr_in remoteAddr;
798         int bytesSent;
799         int status, size;
800
801         int i = 0;
802         while(i < numoid) {
803                 oid = *(oidarry + i);
804                 if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
805                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
806                         return;
807                 } else {
808                         /* Check to see if versions are same */
809 checkversion:
810                         if ((STATUS(header) & LOCK) != LOCK) {          
811                                 //FIXME make locking atomic
812                                 STATUS(header) |= LOCK;
813                                 newversion = header->version;
814                                 if(newversion == *(versionarry + i)) {
815                                         //Add to the notify list 
816                                         if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
817                                                 printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
818                                                 return;
819                                         }
820                                         STATUS(header) &= ~(LOCK);              
821                                 } else {
822                                         STATUS(header) &= ~(LOCK);              
823                                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
824                                                 perror("processReqNotify():socket()");
825                                                 return;
826                                         }
827                                         bzero(&remoteAddr, sizeof(remoteAddr));
828                                         remoteAddr.sin_family = AF_INET;
829                                         remoteAddr.sin_port = htons(LISTEN_PORT);
830                                         remoteAddr.sin_addr.s_addr = htonl(mid);
831
832                                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
833                                                 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
834                                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
835                                                 status = -1;
836                                         } else {
837                                                 //Send Update notification
838                                                 msg[0] = THREAD_NOTIFY_RESPONSE;
839                                                 *((unsigned int *)&msg[1]) = oid;
840                                                 size = sizeof(unsigned int);
841                                                 *((unsigned short *)(&msg[1]+size)) = newversion;
842                                                 size += sizeof(unsigned short);
843                                                 *((unsigned int *)(&msg[1]+size)) = threadid;
844                                                 bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
845                                                 if (bytesSent < 0){
846                                                         perror("processReqNotify():send()");
847                                                         status = -1;
848                                                 } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
849                                                         printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", 
850                                                                         bytesSent, __FILE__, __LINE__);
851                                                         status = -1;
852                                                 } else {
853                                                         status = 0;
854                                                 }
855
856                                         }
857                                         close(sd);
858                                 }
859                         } else {
860                                 randomdelay();
861                                 goto checkversion;
862                         }
863                 }
864                 i++;
865         }
866         free(oidarry);
867         free(versionarry);
868 }
869