Object creation within transaction works now. Yay!
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <netdb.h>
9 #include <fcntl.h>
10 #include "dstm.h"
11 #include "mlookup.h"
12 #include "llookup.h"
13
14 #define LISTEN_PORT 2156
15 #define BACKLOG 10 //max pending connections
16 #define RECEIVE_BUFFER_SIZE 2048
17 #define PRE_BUF_SIZE 2048
18
19 extern int classsize[];
20
21 objstr_t *mainobjstore;
22
23 /* This function initializes the main objects store and creates the 
24  * global machine and location lookup table */
25
26 int dstmInit(void)
27 {
28         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);    
29         if (mhashCreate(HASH_SIZE, LOADFACTOR))
30                 return 1; //failure
31         
32         if (lhashCreate(HASH_SIZE, LOADFACTOR))
33                 return 1; //failure
34         
35         return 0;
36 }
37
38 /* This function starts the thread to listen on a socket 
39  * for tranaction calls */
40 void *dstmListen()
41 {
42         int listenfd, acceptfd;
43         struct sockaddr_in my_addr;
44         struct sockaddr_in client_addr;
45         socklen_t addrlength = sizeof(struct sockaddr);
46         pthread_t thread_dstm_accept;
47         int i;
48         int setsockflag=1;
49
50         listenfd = socket(AF_INET, SOCK_STREAM, 0);
51         if (listenfd == -1)
52         {
53                 perror("socket");
54                 exit(1);
55         }
56
57         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
58           perror("socket");
59           exit(1);
60         }
61 #ifdef MAC
62         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
63           perror("socket");
64           exit(1);
65         }
66 #endif
67
68         my_addr.sin_family = AF_INET;
69         my_addr.sin_port = htons(LISTEN_PORT);
70         my_addr.sin_addr.s_addr = INADDR_ANY;
71         memset(&(my_addr.sin_zero), '\0', 8);
72
73         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
74         {
75                 perror("bind");
76                 exit(1);
77         }
78         
79         if (listen(listenfd, BACKLOG) == -1)
80         {
81                 perror("listen");
82                 exit(1);
83         }
84
85         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
86         while(1)
87         {
88                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
89                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
90         }
91         pthread_exit(NULL);
92 }
93 /* This function accepts a new connection request, decodes the control message in the connection 
94  * and accordingly calls other functions to process new requests */
95 void *dstmAccept(void *acceptfd)
96 {
97         int numbytes,i, val, retval;
98         unsigned int oid;
99         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
100         char *ptr;
101         void *srcObj;
102         objheader_t *h;
103         trans_commit_data_t transinfo;
104         unsigned short objType;
105         
106         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
107
108         printf("Recieved connection: fd = %d\n", (int)acceptfd);
109         /* Receive control messages from other machines */
110         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
111                 if (retval == 0) {
112                         return; // Testing connection
113                 }
114                 perror("Error in receiving control from coordinator\n");
115                 return;
116         }
117         
118         switch(control) {
119                 case READ_REQUEST:
120                         /* Read oid requested and search if available */
121                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
122                                 perror("Error receiving object from cooridnator\n");
123                                 return NULL;
124                         }
125                         srcObj = mhashSearch(oid);
126                         h = (objheader_t *) srcObj;
127                         size = sizeof(objheader_t) + sizeof(classsize[TYPE(h)]);
128                         if (h == NULL) {
129                                 ctrl = OBJECT_NOT_FOUND;
130                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
131                                         perror("Error sending control msg to coordinator\n");
132                                         return NULL;
133                                 }
134                         } else {
135                                 /* Type */
136                           char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
137                           *((int *)&msg[1])=size;
138                           if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
139                                   perror("Error sending size of object to coordinator\n");
140                                   return NULL;
141                           }
142                           if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
143                                   perror("Error in sending object\n");
144                                   return NULL;
145                           }
146                         }
147                         break;
148                 
149                 case READ_MULT_REQUEST:
150                         printf("DEBUG-> READ_MULT_REQUEST\n");
151                         break;
152         
153                 case MOVE_REQUEST:
154                         printf("DEBUG -> MOVE_REQUEST\n");
155                         break;
156
157                 case MOVE_MULT_REQUEST:
158                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
159                         break;
160
161                 case TRANS_REQUEST:
162                         /* Read transaction request */
163                         printf("DEBUG -> Recv TRANS_REQUEST\n");
164                         if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
165                                 printf("Error in readClientReq\n");
166                                 return;
167                         }
168                         break;
169                 case TRANS_PREFETCH:
170                         printf("DEBUG -> Recv TRANS_PREFETCH\n");
171                         if((val = prefetchReq((int)acceptfd)) != 0) {
172                                 printf("Error in readClientReq\n");
173                                 return;
174                         }
175                         break;
176                 case START_REMOTE_THREAD:
177                         retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
178                         if (retval <= 0)
179                                 perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
180                         else if (retval != sizeof(unsigned int))
181                                 printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
182                                         retval);
183                         else
184                         { //TODO: execute run method on this global thread object
185                                 printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid);
186                                 objType = getObjType(oid);
187                                 printf("dstmAccept(): type of object %d is %d\n", oid, objType);
188                         }
189                         break;
190
191                 default:
192                         printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
193         }
194
195         /* Close connection */
196         if (close((int)acceptfd) == -1)
197                 perror("close");
198         else 
199                 printf("Closed connection: fd = %d\n", (int)acceptfd);
200         
201         pthread_exit(NULL);
202 }
203
204 /* This function reads the information available in a transaction request
205  * and makes a function call to process the request */
206 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
207         char *ptr;
208         void *modptr;
209         unsigned int *oidmod, oid;
210         fixed_data_t fixed;
211         objheader_t *headaddr;
212         int sum = 0, i, N, n, val;
213
214         /* Read fixed_data_t data structure */ 
215         N = sizeof(fixed) - 1;
216         ptr = (char *)&fixed;;
217         fixed.control = TRANS_REQUEST;
218         do {
219                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
220                 sum += n;
221         } while(sum < N && n != 0); 
222
223         /* Read list of mids */
224         int mcount = fixed.mcount;
225         N = mcount * sizeof(unsigned int);
226         unsigned int listmid[mcount];
227         ptr = (char *) listmid;
228         sum = 0;
229         do {
230                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
231                 sum += n;
232         } while(sum < N && n != 0);
233
234         /* Read oid and version tuples for those objects that are not modified in the transaction */
235         int numread = fixed.numread;
236         N = numread * (sizeof(unsigned int) + sizeof(short));
237         char objread[N];
238         if(numread != 0) { //If pile contains more than one object to be read, 
239                           // keep reading all objects
240                 sum = 0;
241                 do {
242                         n = recv((int)acceptfd, (void *) objread, N, 0);
243                         sum += n;
244                 } while(sum < N && n != 0);
245         }
246         
247         /* Read modified objects */
248         if(fixed.nummod != 0) { // If pile contains more than one modified object,
249                                 // allocate new object store and recv all modified objects
250                                 // TODO deallocate this space
251                 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
252                         printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
253                         return 1;
254                 }
255                 sum = 0;
256                 do { // Recv the objs that are modified by the Coordinator
257                         n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
258                         sum += n;
259                 } while (sum < fixed.sum_bytes && n != 0);
260         }
261
262         /* Create an array of oids for modified objects */
263         oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
264         ptr = (char *) modptr;
265         for(i = 0 ; i < fixed.nummod; i++) {
266                 headaddr = (objheader_t *) ptr;
267                 oid = OID(headaddr);
268                 oidmod[i] = oid;
269                 ptr += sizeof(objheader_t) + classsize[TYPE(headaddr)];
270         }
271         
272         /*Process the information read */
273         if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
274                 printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
275                 return 1;
276         }
277
278
279         /* Free resources */
280         free(oidmod);
281
282         return 0;
283 }
284
285 /* This function processes the Coordinator's transaction request using "handleTransReq" 
286  * function and sends a reply to the co-ordinator.
287  * Following this it also receives a new control message from the co-ordinator and processes this message*/
288 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
289                 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
290         char *ptr, control, sendctrl;
291         objheader_t *tmp_header;
292         void *header;
293         int  i = 0, val, retval;
294
295         /* Send reply to the Coordinator */
296         if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
297                 printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
298                 return 1;
299         }
300
301         /* Read new control message from Coordiator */
302         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
303                 perror("Error in receiving control message\n");
304                 return 1;
305         }
306
307         /* Process the new control message */
308         switch(control) {
309                 case TRANS_ABORT:
310                         /* Set all ref counts as 1 and do garbage collection */
311                         ptr = modptr;
312                         for(i = 0; i< fixed->nummod; i++) {
313                                 tmp_header = (objheader_t *)ptr;
314                                 tmp_header->rcount = 0;
315                                 ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
316                         }
317                         /* Unlock objects that was locked due to this transaction */
318                         for(i = 0; i< transinfo->numlocked; i++) {
319                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
320                                 STATUS(((objheader_t *)header)) &= ~(LOCK);             
321                         }
322                 
323                         /* Send ack to Coordinator */
324                         printf("DEBUG -> Recv TRANS_ABORT\n");
325                         sendctrl = TRANS_SUCESSFUL;
326                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
327                                 perror("Error sending ACK to coordinator\n");
328                                 return 1;
329                         }
330                         ptr = NULL;
331                         break;
332
333                 case TRANS_COMMIT:
334                         /* Invoke the transCommit process() */
335                         printf("DEBUG -> Recv TRANS_COMMIT \n");
336                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
337                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
338                                 return 1;
339                         }
340                         break;
341
342                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
343                         //TODO expect another transrequest from client
344                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
345                         break;
346                 default:
347                         printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
348                         //TODO Use fixed.trans_id  TID since Client may have died
349                         break;
350         }
351         /* Free memory */
352         printf("DEBUG -> Freeing...\n");
353         fflush(stdout);
354         
355         if (transinfo->objlocked != NULL) {
356                 free(transinfo->objlocked);
357                 transinfo->objlocked = NULL;
358         }
359         if (transinfo->objnotfound != NULL) {
360                 free(transinfo->objnotfound);
361                 transinfo->objnotfound = NULL;
362         }
363         return 0;
364 }
365
366 /* This function increments counters while running a voting decision on all objects involved 
367  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
368 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
369         int val, i = 0;
370         short version;
371         char control = 0, *ptr;
372         unsigned int oid;
373         unsigned int *oidnotfound, *oidlocked, *oidmod;
374         void *mobj;
375         objheader_t *headptr;
376
377         /* Counters and arrays to formulate decision on control message to be sent */
378         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
379         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
380         int objnotfound = 0, objlocked = 0;
381         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
382
383         /* modptr points to the beginning of the object store 
384          * created at the Pariticipant. 
385          * Object store holds the modified objects involved in the transaction request */ 
386         ptr = (char *) modptr;
387         
388         /* Process each oid in the machine pile/ group per thread */
389         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
390                 if (i < fixed->numread) {//Objs only read and not modified
391                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
392                         incr *= i;
393                         oid = *((unsigned int *)(objread + incr));
394                         incr += sizeof(unsigned int);
395                         version = *((short *)(objread + incr));
396                 } else {//Objs modified
397                         headptr = (objheader_t *) ptr;
398                         oid = OID(headptr);
399                         version = headptr->version;
400                         ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
401                 }
402                 
403                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
404
405                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
406                         /* Save the oids not found and number of oids not found for later use */
407                         //oidnotfound[objnotfound] = OID(((objheader_t *)mobj));
408                         oidnotfound[objnotfound] = oid;
409                         objnotfound++;
410                 } else { /* If Obj found in machine (i.e. has not moved) */
411                         /* Check if Obj is locked by any previous transaction */
412                         if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
413                                 if (version == ((objheader_t *)mobj)->version) {      /* If not locked then match versions */
414                                         v_matchlock++;
415                                 } else {/* If versions don't match ...HARD ABORT */
416                                         v_nomatch++;
417                                         /* Send TRANS_DISAGREE to Coordinator */
418                                         control = TRANS_DISAGREE;
419                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
420                                                 perror("Error in sending control to the Coordinator\n");
421                                                 return 0;
422                                         }
423                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
424                                         return control;
425                                 }
426                         } else {/* If Obj is not locked then lock object */
427                                 STATUS(((objheader_t *)mobj)) |= LOCK;
428                                
429                                 /*TESTING Add random wait to make transactions run for a long time such that
430                                  * we can test for soft abort case */
431                         
432                                 randomdelay();
433
434                                 /* Save all object oids that are locked on this machine during this transaction request call */
435                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
436                                 objlocked++;
437                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
438                                         v_matchnolock++;
439                                 } else { /* If versions don't match ...HARD ABORT */
440                                         v_nomatch++;
441                                         control = TRANS_DISAGREE;
442                                         /* Send TRANS_DISAGREE to Coordinator */
443                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
444                                                 perror("Error in sending control to the Coordinator\n");
445                                                 return 0;
446                                         }
447                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
448                                         return control;
449                                 }
450                         }
451                 }
452         }
453         
454         /* Decide what control message to send to Coordinator */
455         if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
456                                         modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
457                 printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__);
458                 return 0;
459         }
460         
461         return val;
462
463 }
464 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
465  * to send to Coordinator based on the votes of oids involved in the transaction */
466 int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
467                 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
468                 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
469         int val;
470         char control = 0;
471         /* Condition to send TRANS_AGREE */
472         if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
473                 control = TRANS_AGREE;
474                 /* Send control message */
475                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
476                         perror("Error in sending control to Coordinator\n");
477                         return 0;
478                 }
479                 printf("DEBUG -> Sending TRANS_AGREE\n");
480         }
481         /* Condition to send TRANS_SOFT_ABORT */
482         if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
483                 control = TRANS_SOFT_ABORT;
484                 char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
485                 *((int*)&msg[1])= *(objnotfound);
486
487                 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
488                 /* Send control message */
489                 if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
490                         perror("Error in sending no of objects that are not found\n");
491                         return 0;
492                 }
493                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
494                 if(*(objnotfound) != 0) { 
495                         int size = sizeof(unsigned int)* *(objnotfound);
496                         if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
497                                 perror("Error in sending objects that are not found\n");
498                                 return 0;
499                         }
500                 }
501         }
502
503         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
504          * if Participant receives a TRANS_COMMIT */
505         transinfo->objlocked = oidlocked;
506         transinfo->objnotfound = oidnotfound;
507         transinfo->modptr = modptr;
508         transinfo->numlocked = *(objlocked);
509         transinfo->numnotfound = *(objnotfound);
510         
511         return control;
512 }
513
514 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
515  * addresses in lookup table and also changes version number
516  * Sends an ACK back to Coordinator */
517 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
518         objheader_t *header;
519         int i = 0, offset = 0;
520         char control;
521
522         /* Process each modified object saved in the mainobject store */
523         for(i = 0; i < nummod; i++) {
524                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
525                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
526                         return 1;
527                 }
528                 /* Change reference count of older address and free space in objstr ?? */
529                 header->rcount = 0;
530
531                 /* Change ptr address in mhash table */
532                 mhashRemove(oidmod[i]);
533                 mhashInsert(oidmod[i], (((char *)modptr) + offset));
534                 offset += sizeof(objheader_t) + classsize[TYPE(header)];
535
536                 /* Update object version number */
537                 header = (objheader_t *) mhashSearch(oidmod[i]);
538                 header->version += 1; 
539         }
540         /* Unlock locked objects */
541         for(i = 0; i < numlocked; i++) {
542                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
543                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
544                         return 1;
545                 }
546                 STATUS(header) &= ~(LOCK);
547         }
548
549         //TODO Update location lookup table
550
551         /* Send ack to coordinator */
552         control = TRANS_SUCESSFUL;
553         printf("DEBUG-> TRANS_SUCESSFUL\n");
554         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
555                 perror("Error sending ACK to coordinator\n");
556         }
557         
558         return 0;
559 }
560
561 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
562  * Looks for the objects to be prefetched in the main object store.
563  * If objects are not found then record those and if objects are found
564  * then use offset values to prefetch references to other objects */
565
566 int prefetchReq(int acceptfd) {
567         int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
568         unsigned int oid, index = 0;
569         char *ptr, buffer[PRE_BUF_SIZE];
570         void *mobj;
571         unsigned int objoid;
572         char *header, control;
573         objheader_t * head;
574         
575         /* Repeatedly recv the oid and offset pairs sent for prefetch */
576         while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
577                 count++;
578                 if(length == -1)
579                         break;
580                 sum = 0;
581                 index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
582                                               // first 4 bytes are saved to send the
583                                               // size of the buffer (that is computed at the end of the loop)
584                 oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
585                 numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
586                 N = numoffset * sizeof(short);
587                 short offset[numoffset];
588                 ptr = (char *)&offset;
589                 /* Recv the offset values per oid */ 
590                 do {
591                         n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
592                         sum += n; 
593                 } while(sum < N && n != 0);     
594
595                 /* Process each oid */
596                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
597                         /* Save the oids not found in buffer for later use */
598                         *(buffer + index) = OBJECT_NOT_FOUND;
599                         index += sizeof(char);
600                         memcpy(buffer+index, &oid, sizeof(unsigned int));
601                         index += sizeof(unsigned int);
602                 } else { /* If Obj found in machine (i.e. has not moved) */
603                         /* send the oid, it's size, it's header and data */
604                         header = (char *) mobj;
605                         head = (objheader_t *) header; 
606                         size = sizeof(objheader_t) + sizeof(classsize[TYPE(head)]);
607                         *(buffer + index) = OBJECT_FOUND;
608                         index += sizeof(char);
609                         memcpy(buffer+index, &oid, sizeof(unsigned int));
610                         index += sizeof(unsigned int);
611                         memcpy(buffer+index, &size, sizeof(int));
612                         index += sizeof(int);
613                         memcpy(buffer + index, header, size);
614                         index += size;
615                         /* Calculate the oid corresponding to the offset value */
616                         for(i = 0 ; i< numoffset ; i++) {
617                                 objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
618                                 if((header = (char *) mhashSearch(objoid)) == NULL) {
619                                         /* Obj not found, send oid */
620                                         *(buffer + index) = OBJECT_NOT_FOUND;
621                                         index += sizeof(char);
622                                         memcpy(buffer+index, &oid, sizeof(unsigned int));
623                                         index += sizeof(unsigned int);
624                                         break;
625                                 } else {/* Obj Found */
626                                         /* send the oid, it's size, it's header and data */
627                                         head = (objheader_t *) header; 
628                                         size = sizeof(objheader_t) + sizeof(classsize[TYPE(head)]);
629                                         *(buffer + index) = OBJECT_FOUND;
630                                         index += sizeof(char);
631                                         memcpy(buffer+index, &oid, sizeof(unsigned int));
632                                         index += sizeof(unsigned int);
633                                         memcpy(buffer+index, &size, sizeof(int));
634                                         index += sizeof(int);
635                                         memcpy(buffer + index, header, size);
636                                         index += size;
637                                         continue;
638                                 }
639                         }
640                 }
641                 /* Check for overflow in the buffer */
642                 if (index >= PRE_BUF_SIZE) {
643                         printf("Char buffer is overflowing\n");
644                         return 1;
645                 }
646                 /* Send Prefetch response control message only once*/
647                 if(count == 1) {
648                         control = TRANS_PREFETCH_RESPONSE;
649                         if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
650                                 perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
651                                 return 1;
652                         }
653                 }
654
655                 /* Add the buffer size into buffer as a parameter */
656                 memcpy(buffer, &index, sizeof(unsigned int));
657                 /* Send the entire buffer with its size and oids found and not found */
658                 if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
659                         perror("Error sending oids found\n");
660                         return 1;
661                 }
662         }
663         return 0;
664 }