small changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <netdb.h>
9 #include <fcntl.h>
10 #include <errno.h>
11 #include <string.h>
12 #include "dstm.h"
13 #include "mlookup.h"
14 #include "llookup.h"
15 #include "threadnotify.h"
16 #ifdef COMPILER
17 #include "thread.h"
18 #endif
19
20
21 #define LISTEN_PORT 2156
22 #define BACKLOG 10 //max pending connections
23 #define RECEIVE_BUFFER_SIZE 2048
24
25 extern int classsize[];
26
27 objstr_t *mainobjstore;
28 pthread_mutex_t mainobjstore_mutex;
29 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
30 /**********************************************************
31  * Global variables to map socketid and remote mid
32  * to  resuse sockets
33  **************************************************/
34 midSocketInfo_t sockArray[NUM_MACHINES];
35 int sockCount; //number of connections with all remote machines(one socket per mc)
36 int sockIdFound; //track if socket file descriptor is already established
37 pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent
38
39 /* This function initializes the main objects store and creates the 
40  * global machine and location lookup table */
41
42 int dstmInit(void)
43 {
44         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
45         /* Initialize attribute for mutex */
46         pthread_mutexattr_init(&mainobjstore_mutex_attr);
47         pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
48         pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
49         if (mhashCreate(HASH_SIZE, LOADFACTOR))
50                 return 1; //failure
51         
52         if (lhashCreate(HASH_SIZE, LOADFACTOR))
53                 return 1; //failure
54
55         if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
56                 return 1; //failure
57         
58         //Initialize mid to socketid mapping array
59         int t;
60         sockCount = 0;
61         for(t = 0; t < NUM_MACHINES; t++) {
62                 sockArray[t].mid = 0;
63                 sockArray[t].sockid = 0;
64         }
65
66         return 0;
67 }
68
69 /* This function starts the thread to listen on a socket 
70  * for tranaction calls */
71 void *dstmListen()
72 {
73         int listenfd, acceptfd;
74         struct sockaddr_in my_addr;
75         struct sockaddr_in client_addr;
76         socklen_t addrlength = sizeof(struct sockaddr);
77         pthread_t thread_dstm_accept;
78         int i;
79         int setsockflag=1;
80
81         listenfd = socket(AF_INET, SOCK_STREAM, 0);
82         if (listenfd == -1)
83         {
84                 perror("socket");
85                 exit(1);
86         }
87
88         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
89           perror("socket");
90           exit(1);
91         }
92 #ifdef MAC
93         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
94           perror("socket");
95           exit(1);
96         }
97 #endif
98
99         my_addr.sin_family = AF_INET;
100         my_addr.sin_port = htons(LISTEN_PORT);
101         my_addr.sin_addr.s_addr = INADDR_ANY;
102         memset(&(my_addr.sin_zero), '\0', 8);
103
104         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
105         {
106                 perror("bind");
107                 exit(1);
108         }
109         
110         if (listen(listenfd, BACKLOG) == -1)
111         {
112                 perror("listen");
113                 exit(1);
114         }
115
116         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
117         while(1)
118         {
119           int retval;
120           acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
121           do {
122             retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
123           } while(retval!=0);
124           pthread_detach(thread_dstm_accept);
125         }
126 }
127 /* This function accepts a new connection request, decodes the control message in the connection 
128  * and accordingly calls other functions to process new requests */
129 void *dstmAccept(void *acceptfd) {
130   int val, retval, size, sum, sockid;
131   unsigned int oid;
132   char *buffer;
133   char control,ctrl;
134   char *ptr;
135   void *srcObj;
136   objheader_t *h;
137   trans_commit_data_t transinfo;
138   unsigned short objType, *versionarry, version;
139   unsigned int *oidarry, numoid, mid, threadid;
140   
141   transinfo.objlocked = NULL;
142   transinfo.objnotfound = NULL;
143   transinfo.modptr = NULL;
144   transinfo.numlocked = 0;
145   transinfo.numnotfound = 0;
146   
147   /* Receive control messages from other machines */
148   while(true) {
149     int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
150     if (ret==-1)
151       break;
152     switch(control) {
153     case READ_REQUEST:
154       /* Read oid requested and search if available */
155       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
156       if((srcObj = mhashSearch(oid)) == NULL) {
157         printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
158         break;
159       }
160       h = (objheader_t *) srcObj;
161       GETSIZE(size, h);
162       size += sizeof(objheader_t);
163       sockid = (int) acceptfd;
164       
165       if (h == NULL) {
166         ctrl = OBJECT_NOT_FOUND;
167         send_data(sockid, &ctrl, sizeof(char));
168       } else {
169         /* Type */
170         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
171         *((int *)&msg[1])=size;
172         send_data(sockid, &msg, sizeof(msg));
173         send_data(sockid, h, size);
174       }
175       break;
176       
177     case READ_MULT_REQUEST:
178       break;
179       
180     case MOVE_REQUEST:
181       break;
182       
183     case MOVE_MULT_REQUEST:
184       break;
185       
186     case TRANS_REQUEST:
187       /* Read transaction request */
188       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
189         printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
190         pthread_exit(NULL);
191       }
192       break;
193     case TRANS_PREFETCH:
194       if((val = prefetchReq((int)acceptfd)) != 0) {
195         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
196         break;
197       }
198       break;
199     case TRANS_PREFETCH_RESPONSE:
200       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
201         printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
202         break;
203       }
204       break;
205     case START_REMOTE_THREAD:
206       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
207       objType = getObjType(oid);
208       startDSMthread(oid, objType);
209       break;
210       
211     case THREAD_NOTIFY_REQUEST:
212       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
213       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
214       if((buffer = calloc(1,size)) == NULL) {
215         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
216         pthread_exit(NULL);
217       }
218       
219       recv_data((int)acceptfd, buffer, size);
220       
221       oidarry = calloc(numoid, sizeof(unsigned int)); 
222       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
223       size = sizeof(unsigned int) * numoid;
224       versionarry = calloc(numoid, sizeof(unsigned short));
225       memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
226       size += sizeof(unsigned short) * numoid;
227       mid = *((unsigned int *)(buffer+size));
228       size += sizeof(unsigned int);
229       threadid = *((unsigned int *)(buffer+size));
230       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
231       free(buffer);
232       
233       break;
234
235     case THREAD_NOTIFY_RESPONSE:
236       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
237       if((buffer = calloc(1,size)) == NULL) {
238         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
239         pthread_exit(NULL);
240       }
241       
242       recv_data((int)acceptfd, buffer, size);
243       
244       oid = *((unsigned int *)buffer);
245       size = sizeof(unsigned int);
246       version = *((unsigned short *)(buffer+size));
247       size += sizeof(unsigned short);
248       threadid = *((unsigned int *)(buffer+size));
249       threadNotify(oid,version,threadid);
250       free(buffer);
251       break;
252
253     case CLOSE_CONNECTION:
254       goto closeconnection;
255
256     default:
257       printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
258     }
259   }
260
261  closeconnection:
262   /* Close connection */
263   if (close((int)acceptfd) == -1)
264     perror("close");
265   pthread_exit(NULL);
266 }
267   
268 /* This function reads the information available in a transaction request
269  * and makes a function call to process the request */
270 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
271         char *ptr;
272         void *modptr;
273         unsigned int *oidmod, oid;
274         fixed_data_t fixed;
275         objheader_t *headaddr;
276         int sum, i, size, n, val;
277
278         oidmod = NULL;
279
280         /* Read fixed_data_t data structure */ 
281         size = sizeof(fixed) - 1;
282         ptr = (char *)&fixed;;
283         fixed.control = TRANS_REQUEST;
284         recv_data((int)acceptfd, ptr+1, size);
285
286         /* Read list of mids */
287         int mcount = fixed.mcount;
288         size = mcount * sizeof(unsigned int);
289         unsigned int listmid[mcount];
290         ptr = (char *) listmid;
291         recv_data((int)acceptfd, ptr, size);
292         
293         /* Read oid and version tuples for those objects that are not modified in the transaction */
294         int numread = fixed.numread;
295         size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
296         char objread[size];
297         if(numread != 0) { //If pile contains more than one object to be read, 
298                           // keep reading all objects
299                 recv_data((int)acceptfd, objread, size);        
300         }
301         
302         /* Read modified objects */
303         if(fixed.nummod != 0) {
304                 if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
305                         printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
306                         return 1;
307                 }
308                 size = fixed.sum_bytes;
309                 recv_data((int)acceptfd, modptr, size); 
310         }
311
312         /* Create an array of oids for modified objects */
313         oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
314         if (oidmod == NULL)
315         {
316                 printf("calloc error %s, %d\n", __FILE__, __LINE__);
317                 return 1;
318         }
319         ptr = (char *) modptr;
320         for(i = 0 ; i < fixed.nummod; i++) {
321           int tmpsize;
322           headaddr = (objheader_t *) ptr;
323           oid = OID(headaddr);
324           oidmod[i] = oid;
325           GETSIZE(tmpsize, headaddr);
326           ptr += sizeof(objheader_t) + tmpsize;
327         }
328         
329         /*Process the information read */
330         if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
331                 printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
332                 /* Free resources */
333                 if(oidmod != NULL) {
334                         free(oidmod);
335                 }
336                 return 1;
337         }
338
339         /* Free resources */
340         if(oidmod != NULL) {
341                 free(oidmod);
342         }
343
344         return 0;
345 }
346
347 /* This function processes the Coordinator's transaction request using "handleTransReq" 
348  * function and sends a reply to the co-ordinator.
349  * Following this it also receives a new control message from the co-ordinator and processes this message*/
350 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
351                 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
352         char control, sendctrl, retval;
353         objheader_t *tmp_header;
354         void *header;
355         int  i = 0, val;
356
357         /* Send reply to the Coordinator */
358         if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
359                 printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
360                 return 1;
361         }
362
363         recv_data((int)acceptfd, &control, sizeof(char));
364         
365         /* Process the new control message */
366         switch(control) {
367                 case TRANS_ABORT:
368                         if (fixed->nummod > 0)
369                                 free(modptr);
370                         /* Unlock objects that was locked due to this transaction */
371                         for(i = 0; i< transinfo->numlocked; i++) {
372                                 if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
373                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);// find the header address
374                                         return 1;
375                                 }
376                                 STATUS(((objheader_t *)header)) &= ~(LOCK);             
377                         }
378
379                         /* Send ack to Coordinator */
380                         sendctrl = TRANS_UNSUCESSFUL;
381                         send_data((int)acceptfd, &sendctrl, sizeof(char));
382                         break;
383
384                 case TRANS_COMMIT:
385                         /* Invoke the transCommit process() */
386                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
387                                 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
388                                 /* Free memory */
389                                 if (transinfo->objlocked != NULL) {
390                                         free(transinfo->objlocked);
391                                 }
392                                 if (transinfo->objnotfound != NULL) {
393                                         free(transinfo->objnotfound);
394                                 }
395                                 return 1;
396                         }
397                         break;
398
399                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
400                         break;
401                 default:
402                         printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
403                         //TODO Use fixed.trans_id  TID since Client may have died
404                         break;
405         }
406
407         /* Free memory */
408         if (transinfo->objlocked != NULL) {
409                 free(transinfo->objlocked);
410         }
411         if (transinfo->objnotfound != NULL) {
412                 free(transinfo->objnotfound);
413         }
414
415         return 0;
416 }
417
418 /* This function increments counters while running a voting decision on all objects involved 
419  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
420 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
421         int val, i = 0, j;
422         unsigned short version;
423         char control = 0, *ptr;
424         unsigned int oid;
425         unsigned int *oidnotfound, *oidlocked;
426         void *mobj;
427         objheader_t *headptr;
428
429         /* Counters and arrays to formulate decision on control message to be sent */
430         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
431         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
432         int objnotfound = 0, objlocked = 0;
433         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
434
435         /* modptr points to the beginning of the object store 
436          * created at the Pariticipant. 
437          * Object store holds the modified objects involved in the transaction request */ 
438         ptr = (char *) modptr;
439         
440         /* Process each oid in the machine pile/ group per thread */
441         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
442                 if (i < fixed->numread) {//Objs only read and not modified
443                         int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
444                         incr *= i;
445                         oid = *((unsigned int *)(objread + incr));
446                         incr += sizeof(unsigned int);
447                         version = *((unsigned short *)(objread + incr));
448                 } else {//Objs modified
449                   int tmpsize;
450                   headptr = (objheader_t *) ptr;
451                   oid = OID(headptr);
452                   version = headptr->version;
453                   GETSIZE(tmpsize, headptr);
454                   ptr += sizeof(objheader_t) + tmpsize;
455                 }
456                 
457                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
458
459                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
460                         /* Save the oids not found and number of oids not found for later use */
461                         oidnotfound[objnotfound] = oid;
462                         objnotfound++;
463                 } else { /* If Obj found in machine (i.e. has not moved) */
464                         /* Check if Obj is locked by any previous transaction */
465                         if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
466                                 if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
467                                         v_matchlock++;
468                                 } else {/* If versions don't match ...HARD ABORT */
469                                         v_nomatch++;
470                                         /* Send TRANS_DISAGREE to Coordinator */
471                                         control = TRANS_DISAGREE;
472                                         if (objlocked > 0) {
473                                           for(j = 0; j < objlocked; j++) {
474                                                         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
475                                                                 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
476                                                                 return 0;
477                                                         }
478                                                         STATUS(headptr) &= ~(LOCK);
479                                                 }
480                                                 free(oidlocked);
481                                         }
482                                         send_data(acceptfd, &control, sizeof(char));
483                                         return control;
484                                 }
485                         } else {/* If Obj is not locked then lock object */
486                                 STATUS(((objheader_t *)mobj)) |= LOCK;
487                                 /* Save all object oids that are locked on this machine during this transaction request call */
488                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
489                                 objlocked++;
490                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
491                                         v_matchnolock++;
492                                 } else { /* If versions don't match ...HARD ABORT */
493                                         v_nomatch++;
494                                         control = TRANS_DISAGREE;
495                                         if (objlocked > 0) {
496                                                 for(j = 0; j < objlocked; j++) {
497                                                         if((headptr = mhashSearch(oidlocked[j])) == NULL) {
498                                                                 printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
499                                                                 return 0;
500                                                         }
501                                                         STATUS(headptr) &= ~(LOCK);
502                                                 }
503                                                 free(oidlocked);
504                                         }
505
506                                         /* Send TRANS_DISAGREE to Coordinator */
507                                         send_data(acceptfd, &control, sizeof(char));
508                                         return control;
509                                 }
510                         }
511                 }
512         }
513         
514         /* Decide what control message to send to Coordinator */
515         if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
516                                         modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
517                 printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
518                 return 0;
519         }
520         
521         return control;
522
523 }
524 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
525  * to send to Coordinator based on the votes of oids involved in the transaction */
526 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
527                 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
528                 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
529         int val;
530         char control = 0;
531
532         /* Condition to send TRANS_AGREE */
533         if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
534                 control = TRANS_AGREE;
535                 /* Send control message */
536                 send_data(acceptfd, &control, sizeof(char));
537         }
538         /* Condition to send TRANS_SOFT_ABORT */
539         if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
540                 control = TRANS_SOFT_ABORT;
541
542                 /* Send control message */
543                 send_data(acceptfd, &control, sizeof(char));
544         
545                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
546                 if(*(objnotfound) != 0) { 
547                         int msg[1];
548                         msg[0] = *(objnotfound);
549                         send_data(acceptfd, &msg, sizeof(int));
550                         int size = sizeof(unsigned int)* *(objnotfound);
551                         send_data(acceptfd, oidnotfound, size);
552                 }
553         }
554
555         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
556          * if Participant receives a TRANS_COMMIT */
557         transinfo->objlocked = oidlocked;
558         transinfo->objnotfound = oidnotfound;
559         transinfo->modptr = modptr;
560         transinfo->numlocked = *(objlocked);
561         transinfo->numnotfound = *(objnotfound);
562         
563         return control;
564 }
565
566 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
567  * addresses in lookup table and also changes version number
568  * Sends an ACK back to Coordinator */
569 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
570         objheader_t *header;
571         objheader_t *newheader;
572         int i = 0, offset = 0;
573         char control;
574         int tmpsize;
575
576         /* Process each modified object saved in the mainobject store */
577         for(i = 0; i < nummod; i++) {
578                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
579                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
580                         return 1;
581                 }
582                 GETSIZE(tmpsize,header);
583                 pthread_mutex_lock(&mainobjstore_mutex);
584                 memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
585                 header->version += 1; 
586                 /* If threads are waiting on this object to be updated, notify them */
587                 if(header->notifylist != NULL) {
588                         notifyAll(&header->notifylist, OID(header), header->version);
589                 }
590                 pthread_mutex_unlock(&mainobjstore_mutex);
591                 offset += sizeof(objheader_t) + tmpsize;
592         }
593
594         if (nummod > 0)
595                 free(modptr);
596
597         /* Unlock locked objects */
598         for(i = 0; i < numlocked; i++) {
599                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
600                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
601                         return 1;
602                 }
603                 STATUS(header) &= ~(LOCK);
604         }
605         //TODO Update location lookup table
606
607         /* Send ack to coordinator */
608         control = TRANS_SUCESSFUL;
609         send_data((int)acceptfd, &control, sizeof(char));
610         return 0;
611 }
612
613 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
614  * Looks for the objects to be prefetched in the main object store.
615  * If objects are not found then record those and if objects are found
616  * then use offset values to prefetch references to other objects */
617
618 int prefetchReq(int acceptfd) {
619         int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
620         int length, sd = -1;
621         char *recvbuffer, *sendbuffer, control;
622         unsigned int oid, mid;
623         short *offsetarry;
624         objheader_t *header;
625         struct sockaddr_in remoteAddr;
626
627         do {
628                 recv_data((int)acceptfd, &length, sizeof(int));
629                 if(length != -1) {
630                         size = length - sizeof(int);
631                         if((recvbuffer = calloc(1, size)) == NULL) {
632                                 printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
633                                 return -1;
634                         }
635                         recv_data((int)acceptfd, recvbuffer, size);
636                         oid = *((unsigned int *) recvbuffer);
637                         mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
638                         size = size - (2 * sizeof(unsigned int));
639                         numoffset = size / sizeof(short);
640                         if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
641                                 printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
642                                 free(recvbuffer);
643                                 return -1;
644                         }
645                         memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
646                         free(recvbuffer);
647                         pthread_mutex_lock(&sockLock);
648                         sockIdFound = 0;
649                         pthread_mutex_unlock(&sockLock);
650             /* If socket is already established then send data reusing socket */
651                         for(i = 0; i < NUM_MACHINES; i++) {
652                                 if(sockArray[i].mid == mid) {
653                                         sd = sockArray[i].sockid;
654                                         pthread_mutex_lock(&sockLock);
655                                         sockIdFound = 1;
656                                         pthread_mutex_unlock(&sockLock);
657                                         break;
658                                 }
659                         }
660
661                         if(sockIdFound == 0) {
662                                 if(sockCount < NUM_MACHINES) {
663                                         /* Create socket to send information */
664                                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
665                                                 perror("prefetchReq():socket()");
666                                                 return -1;
667                                         }
668                                         bzero(&remoteAddr, sizeof(remoteAddr));
669                                         remoteAddr.sin_family = AF_INET;
670                                         remoteAddr.sin_port = htons(LISTEN_PORT);
671                                         remoteAddr.sin_addr.s_addr = htonl(mid);
672
673                                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
674                                                 perror("connect");
675                                                 printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
676                                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
677                                                 close(sd);
678                                                 return -1;
679                                         }
680                                         sockArray[sockCount].mid = mid;
681                                         sockArray[sockCount].sockid = sd;
682                                         pthread_mutex_lock(&sockLock);
683                                         sockCount++;
684                                         pthread_mutex_unlock(&sockLock);
685                                 } else {
686                                         //TODO Fix for connecting to more than 2 machines && close socket
687                                         printf("%s(): Error: Currently works for only 2 machines\n", __func__);
688                                         return -1;
689                                 }
690                         }
691
692                         /*Process each oid */
693                         if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
694                                 /* Save the oids not found in buffer for later use */
695                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
696                                 if((sendbuffer = calloc(1, size)) == NULL) {
697                                         printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
698                                         free(offsetarry);
699                                         close(sd);
700                                         return -1;
701                                 }
702                                 *((int *) sendbuffer) = size;
703                                 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
704                                 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
705
706                                 control = TRANS_PREFETCH_RESPONSE;
707                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
708                                         free(offsetarry);
709                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
710                                                         __func__, __FILE__, __LINE__);
711                                         close(sd);
712                                         return -1;
713                                 }
714                         } else { /* Object Found */
715                                 int incr = 0;
716                                 GETSIZE(objsize, header);
717                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
718                                 if((sendbuffer = calloc(1, size)) == NULL) {
719                                         printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
720                                         free(offsetarry);
721                                         close(sd);
722                                         return -1;
723                                 }
724                                 *((int *) (sendbuffer + incr)) = size;
725                                 incr += sizeof(int);
726                                 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
727                                 incr += sizeof(char);
728                                 *((unsigned int *)(sendbuffer+incr)) = oid;
729                                 incr += sizeof(unsigned int);
730                                 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
731
732                                 control = TRANS_PREFETCH_RESPONSE;
733                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
734                                         free(offsetarry);
735                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
736                                                         __func__, __FILE__, __LINE__);
737                                         close(sd);
738                                         return -1;
739                                 }
740
741                                 /* Calculate the oid corresponding to the offset value */
742                                 for(i = 0 ; i< numoffset ; i++) {
743                                         /* Check for arrays  */
744                                         if(TYPE(header) > NUMCLASSES) {
745                                                 isArray = 1;
746                                         }
747                                         if(isArray == 1) {
748                                                 int elementsize = classsize[TYPE(header)];
749                                                 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
750                                                 unsigned short length = ao->___length___;
751                                                 /* Check if array out of bounds */
752                                                 if(offsetarry[i]< 0 || offsetarry[i] >= length) {
753                                                         break;
754                                                 }
755                                                 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
756                                         } else {
757                                                 oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
758                                         }
759
760                                         if((header = mhashSearch(oid)) == NULL) {
761                                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
762                                                 if((sendbuffer = calloc(1, size)) == NULL) {
763                                                         printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
764                                                         free(offsetarry);
765                                                         close(sd);
766                                                         return -1;
767                                                 }
768                                                 *((int *) sendbuffer) = size;
769                                                 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
770                                                 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
771
772                                                 control = TRANS_PREFETCH_RESPONSE;
773                                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
774                                                         free(offsetarry);
775                                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
776                                                                         __FILE__, __LINE__);
777                                                         close(sd);
778                                                         return -1;
779                                                 }
780                                                 break;
781                                         } else {/* Obj Found */
782                                                 int incr = 0;
783                                                 GETSIZE(objsize, header);
784                                                 size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
785                                                 if((sendbuffer = calloc(1, size)) == NULL) {
786                                                         printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
787                                                         free(offsetarry);
788                                                         close(sd);
789                                                         return -1;
790                                                 }
791                                                 *((int *) (sendbuffer + incr)) = size;
792                                                 incr += sizeof(int);
793                                                 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
794                                                 incr += sizeof(char);
795                                                 *((unsigned int *)(sendbuffer+incr)) = oid;
796                                                 incr += sizeof(unsigned int);
797                                                 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
798
799                                                 control = TRANS_PREFETCH_RESPONSE;
800                                                 if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
801                                                         free(offsetarry);
802                                                         printf("Error: %s() in sending prefetch response at %s, %d\n",
803                                                                         __func__, __FILE__, __LINE__);
804                                                         close(sd);
805                                                         return -1;
806                                                 }
807                                         }
808                                         isArray = 0;
809                                 }
810                                 free(offsetarry);
811                         }
812                 }
813         } while (length != -1);
814         return 0;
815 }
816
817 int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
818         int numbytes = 0;
819
820         send_data(sd, control, sizeof(char));
821         /* Send the buffer with its size */
822         int length = *(size);
823         send_data(sd, sendbuffer, length);
824         free(sendbuffer);
825         return 0;
826 }
827
828 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
829         objheader_t *header;
830         unsigned int oid;
831         unsigned short newversion;
832         char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
833         int sd;
834         struct sockaddr_in remoteAddr;
835         int bytesSent;
836         int size;
837
838         int i = 0;
839         while(i < numoid) {
840                 oid = *(oidarry + i);
841                 if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
842                         printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
843                         return;
844                 } else {
845                         /* Check to see if versions are same */
846 checkversion:
847                         if ((STATUS(header) & LOCK) != LOCK) {          
848                                 //FIXME make locking atomic
849                                 STATUS(header) |= LOCK;
850                                 newversion = header->version;
851                                 if(newversion == *(versionarry + i)) {
852                                         //Add to the notify list 
853                                         if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
854                                                 printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__); 
855                                                 return;
856                                         }
857                                         STATUS(header) &= ~(LOCK);              
858                                 } else {
859                                         STATUS(header) &= ~(LOCK);              
860                                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
861                                                 perror("processReqNotify():socket()");
862                                                 return;
863                                         }
864                                         bzero(&remoteAddr, sizeof(remoteAddr));
865                                         remoteAddr.sin_family = AF_INET;
866                                         remoteAddr.sin_port = htons(LISTEN_PORT);
867                                         remoteAddr.sin_addr.s_addr = htonl(mid);
868
869                                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
870                                                 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
871                                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
872                                                 close(sd);
873                                                 return;
874                                         } else {
875                                                 //Send Update notification
876                                                 msg[0] = THREAD_NOTIFY_RESPONSE;
877                                                 *((unsigned int *)&msg[1]) = oid;
878                                                 size = sizeof(unsigned int);
879                                                 *((unsigned short *)(&msg[1]+size)) = newversion;
880                                                 size += sizeof(unsigned short);
881                                                 *((unsigned int *)(&msg[1]+size)) = threadid;
882                                                 size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
883                                                 send_data(sd, msg, size);
884                                         }
885                                         close(sd);
886                                 }
887                         } else {
888                                 randomdelay();
889                                 goto checkversion;
890                         }
891                 }
892                 i++;
893         }
894         free(oidarry);
895         free(versionarry);
896 }