Minor bug fix for trans abort case
[IRC.git] / Robust / src / Runtime / DSTM / interface / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #include <pthread.h>
8 #include <netdb.h>
9 #include <fcntl.h>
10 #include "dstm.h"
11 #include "mlookup.h"
12 #include "llookup.h"
13 #ifdef COMPILER
14 #include "thread.h"
15 #endif
16
17
18 #define LISTEN_PORT 2156
19 #define BACKLOG 10 //max pending connections
20 #define RECEIVE_BUFFER_SIZE 2048
21 #define PRE_BUF_SIZE 2048
22
23 extern int classsize[];
24
25 objstr_t *mainobjstore;
26 pthread_mutex_t mainobjstore_mutex;
27 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
28
29 /* This function initializes the main objects store and creates the 
30  * global machine and location lookup table */
31
32 int dstmInit(void)
33 {
34         mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
35         /* Initialize attribute for mutex */
36         pthread_mutexattr_init(&mainobjstore_mutex_attr);
37         pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
38         //pthread_mutex_init(&mainobjstore_mutex, NULL);
39         pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
40         if (mhashCreate(HASH_SIZE, LOADFACTOR))
41                 return 1; //failure
42         
43         if (lhashCreate(HASH_SIZE, LOADFACTOR))
44                 return 1; //failure
45         
46         return 0;
47 }
48
49 /* This function starts the thread to listen on a socket 
50  * for tranaction calls */
51 void *dstmListen()
52 {
53         int listenfd, acceptfd;
54         struct sockaddr_in my_addr;
55         struct sockaddr_in client_addr;
56         socklen_t addrlength = sizeof(struct sockaddr);
57         pthread_t thread_dstm_accept;
58         int i;
59         int setsockflag=1;
60
61         listenfd = socket(AF_INET, SOCK_STREAM, 0);
62         if (listenfd == -1)
63         {
64                 perror("socket");
65                 exit(1);
66         }
67
68         if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
69           perror("socket");
70           exit(1);
71         }
72 #ifdef MAC
73         if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
74           perror("socket");
75           exit(1);
76         }
77 #endif
78
79         my_addr.sin_family = AF_INET;
80         my_addr.sin_port = htons(LISTEN_PORT);
81         my_addr.sin_addr.s_addr = INADDR_ANY;
82         memset(&(my_addr.sin_zero), '\0', 8);
83
84         if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
85         {
86                 perror("bind");
87                 exit(1);
88         }
89         
90         if (listen(listenfd, BACKLOG) == -1)
91         {
92                 perror("listen");
93                 exit(1);
94         }
95
96         printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
97         while(1)
98         {
99                 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
100                 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
101         }
102 }
103 /* This function accepts a new connection request, decodes the control message in the connection 
104  * and accordingly calls other functions to process new requests */
105 void *dstmAccept(void *acceptfd)
106 {
107         int numbytes,i, val, retval;
108         unsigned int oid;
109         char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
110         char *ptr;
111         void *srcObj;
112         objheader_t *h;
113         trans_commit_data_t transinfo;
114         unsigned short objType;
115         
116         transinfo.objlocked = NULL;
117         transinfo.objnotfound = NULL;
118         transinfo.modptr = NULL;
119         transinfo.numlocked = 0;
120         transinfo.numnotfound = 0;
121
122         int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
123
124         /* Receive control messages from other machines */
125         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
126                 if (retval == 0) {
127                         pthread_exit(NULL); // Testing connection
128                 }
129                 perror("Error in receiving control from coordinator\n");
130                 pthread_exit(NULL);
131         }
132         
133         switch(control) {
134                 case READ_REQUEST:
135                         /* Read oid requested and search if available */
136                         if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
137                                 perror("Error receiving object from cooridnator\n");
138                                 pthread_exit(NULL);
139                         }
140                         if((srcObj = mhashSearch(oid)) == NULL) {
141                                 printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
142                         }
143                         h = (objheader_t *) srcObj;
144                         GETSIZE(size, h);
145                         size += sizeof(objheader_t);
146
147                         if (h == NULL) {
148                                 ctrl = OBJECT_NOT_FOUND;
149                                 if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
150                                         perror("Error sending control msg to coordinator\n");
151                                         pthread_exit(NULL);
152                                 }
153                         } else {
154                                 /* Type */
155                                 char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
156                                 *((int *)&msg[1])=size;
157                                 if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
158                                         perror("Error sending size of object to coordinator\n");
159                                         pthread_exit(NULL);
160                                 }
161                                 if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
162                                         perror("Error in sending object\n");
163                                         pthread_exit(NULL);
164                                 }
165                         }
166                         break;
167                 
168                 case READ_MULT_REQUEST:
169                         printf("DEBUG-> READ_MULT_REQUEST\n");
170                         break;
171         
172                 case MOVE_REQUEST:
173                         printf("DEBUG -> MOVE_REQUEST\n");
174                         break;
175
176                 case MOVE_MULT_REQUEST:
177                         printf("DEBUG -> MOVE_MULT_REQUEST\n");
178                         break;
179
180                 case TRANS_REQUEST:
181                         /* Read transaction request */
182                         printf("DEBUG -> Recv TRANS_REQUEST\n");
183                         if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
184                                 printf("Error in readClientReq\n");
185                                 pthread_exit(NULL);
186                         }
187                         break;
188                 case TRANS_PREFETCH:
189                         printf("DEBUG -> Recv TRANS_PREFETCH\n");
190                         if((val = prefetchReq((int)acceptfd)) != 0) {
191                                 printf("Error in readClientReq\n");
192                                 pthread_exit(NULL);
193                         }
194                         break;
195                 case START_REMOTE_THREAD:
196                         retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
197                         if (retval <= 0)
198                                 perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
199                         else if (retval != sizeof(unsigned int))
200                                 printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
201                                         retval);
202                         else
203                         { //TODO: execute run method on this global thread object
204                                 printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid);
205                                 objType = getObjType(oid);
206                                 printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType);
207                                 startDSMthread(oid, objType);
208
209
210                         }
211                         break;
212
213                 default:
214                         printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
215         }
216
217         /* Close connection */
218         if (close((int)acceptfd) == -1)
219                 perror("close");
220         
221         pthread_exit(NULL);
222 }
223
224 /* This function reads the information available in a transaction request
225  * and makes a function call to process the request */
226 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
227         char *ptr;
228         void *modptr;
229         unsigned int *oidmod, oid;
230         fixed_data_t fixed;
231         objheader_t *headaddr;
232         int sum = 0, i, N, n, val;
233
234         oidmod = NULL;
235
236         /* Read fixed_data_t data structure */ 
237         N = sizeof(fixed) - 1;
238         ptr = (char *)&fixed;;
239         fixed.control = TRANS_REQUEST;
240         do {
241                 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
242                 sum += n;
243         } while(sum < N && n != 0); 
244
245         /* Read list of mids */
246         int mcount = fixed.mcount;
247         N = mcount * sizeof(unsigned int);
248         unsigned int listmid[mcount];
249         ptr = (char *) listmid;
250         sum = 0;
251         do {
252                 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
253                 sum += n;
254         } while(sum < N && n != 0);
255
256         /* Read oid and version tuples for those objects that are not modified in the transaction */
257         int numread = fixed.numread;
258         N = numread * (sizeof(unsigned int) + sizeof(short));
259         char objread[N];
260         if(numread != 0) { //If pile contains more than one object to be read, 
261                           // keep reading all objects
262                 sum = 0;
263                 do {
264                         n = recv((int)acceptfd, (void *) objread, N, 0);
265                         sum += n;
266                 } while(sum < N && n != 0);
267         }
268         
269         /* Read modified objects */
270         if(fixed.nummod != 0) { // If pile contains more than one modified object,
271                                 // allocate new object store and recv all modified objects
272                                 // TODO deallocate this space
273                 pthread_mutex_lock(&mainobjstore_mutex);
274                 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
275                         printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
276                         pthread_mutex_unlock(&mainobjstore_mutex);
277                         return 1;
278                 }
279                 pthread_mutex_unlock(&mainobjstore_mutex);
280                 sum = 0;
281                 do { // Recv the objs that are modified by the Coordinator
282                         n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
283                         sum += n;
284                 } while (sum < fixed.sum_bytes && n != 0);
285         }
286
287         /* Create an array of oids for modified objects */
288         oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
289         if (oidmod == NULL)
290         {
291                 printf("calloc error %s, %d\n", __FILE__, __LINE__);
292                 return 1;
293         }
294         ptr = (char *) modptr;
295         for(i = 0 ; i < fixed.nummod; i++) {
296           int tmpsize;
297           headaddr = (objheader_t *) ptr;
298           oid = OID(headaddr);
299           oidmod[i] = oid;
300           GETSIZE(tmpsize, headaddr);
301           ptr += sizeof(objheader_t) + tmpsize;
302         }
303         
304         /*Process the information read */
305         if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
306                 printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
307                 /* Free resources */
308                 if(oidmod != NULL) {
309                         free(oidmod);
310                         oidmod = NULL;
311                 }
312                 return 1;
313         }
314
315         /* Free resources */
316         if(oidmod != NULL) {
317                 free(oidmod);
318                 oidmod = NULL;
319         }
320
321         return 0;
322 }
323
324 /* This function processes the Coordinator's transaction request using "handleTransReq" 
325  * function and sends a reply to the co-ordinator.
326  * Following this it also receives a new control message from the co-ordinator and processes this message*/
327 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
328                 unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
329         char *ptr, control, sendctrl;
330         objheader_t *tmp_header;
331         void *header;
332         int  i = 0, val, retval;
333
334         /* Send reply to the Coordinator */
335         if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
336                 printf("Handle Trans Req error %s, %d\n", __FILE__, __LINE__);
337                 return 1;
338         }
339
340         /* Read new control message from Coordiator */
341         if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
342                 perror("Error in receiving control message\n");
343                 return 1;
344         }
345
346         /* Process the new control message */
347         switch(control) {
348                 case TRANS_ABORT:
349                         /* Set all ref counts as 1 and do garbage collection */
350                         ptr = modptr;
351                         for(i = 0; i< fixed->nummod; i++) {
352                           int tmpsize;
353                           tmp_header = (objheader_t *)ptr;
354                           tmp_header->rcount = 0;
355                           GETSIZE(tmpsize, tmp_header);
356                           ptr += sizeof(objheader_t) + tmpsize;
357                         }
358                         /* Unlock objects that was locked due to this transaction */
359                         for(i = 0; i< transinfo->numlocked; i++) {
360                                 header = mhashSearch(transinfo->objlocked[i]);// find the header address
361                                 STATUS(((objheader_t *)header)) &= ~(LOCK);             
362                         }
363
364                         /* Send ack to Coordinator */
365                         sendctrl = TRANS_SUCESSFUL;
366                         if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
367                                 perror("Error sending ACK to coordinator\n");
368                                 if (transinfo->objlocked != NULL) {
369                                         free(transinfo->objlocked);
370                                         transinfo->objlocked = NULL;
371                                 }
372                                 if (transinfo->objnotfound != NULL) {
373                                         free(transinfo->objnotfound);
374                                         transinfo->objnotfound = NULL;
375                                 }
376
377                                 return 1;
378                         }
379                         ptr = NULL;
380                         break;
381
382                 case TRANS_COMMIT:
383                         /* Invoke the transCommit process() */
384                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
385                                 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
386                                 /* Free memory */
387                                 printf("DEBUG -> Freeing...\n");
388                                 fflush(stdout);
389                                 if (transinfo->objlocked != NULL) {
390                                         free(transinfo->objlocked);
391                                         transinfo->objlocked = NULL;
392                                 }
393                                 if (transinfo->objnotfound != NULL) {
394                                         free(transinfo->objnotfound);
395                                         transinfo->objnotfound = NULL;
396                                 }
397                                 return 1;
398                         }
399                         break;
400
401                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
402                         //TODO expect another transrequest from client
403                         printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
404                         break;
405                 default:
406                         printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
407                         //TODO Use fixed.trans_id  TID since Client may have died
408                         break;
409         }
410
411         /* Free memory */
412         printf("DEBUG -> Freeing...\n");
413         fflush(stdout);
414
415         if (transinfo->objlocked != NULL) {
416                 free(transinfo->objlocked);
417                 transinfo->objlocked = NULL;
418         }
419         if (transinfo->objnotfound != NULL) {
420                 free(transinfo->objnotfound);
421                 transinfo->objnotfound = NULL;
422         }
423
424         return 0;
425 }
426
427 /* This function increments counters while running a voting decision on all objects involved 
428  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
429 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
430         int val, i = 0;
431         short version;
432         char control = 0, *ptr;
433         unsigned int oid;
434         unsigned int *oidnotfound, *oidlocked;
435         void *mobj;
436         objheader_t *headptr;
437
438         /* Counters and arrays to formulate decision on control message to be sent */
439         oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
440         oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
441         int objnotfound = 0, objlocked = 0;
442         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
443
444         /* modptr points to the beginning of the object store 
445          * created at the Pariticipant. 
446          * Object store holds the modified objects involved in the transaction request */ 
447         ptr = (char *) modptr;
448         
449         /* Process each oid in the machine pile/ group per thread */
450         for (i = 0; i < fixed->numread + fixed->nummod; i++) {
451                 if (i < fixed->numread) {//Objs only read and not modified
452                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
453                         incr *= i;
454                         oid = *((unsigned int *)(objread + incr));
455                         incr += sizeof(unsigned int);
456                         version = *((short *)(objread + incr));
457                 } else {//Objs modified
458                   int tmpsize;
459                   headptr = (objheader_t *) ptr;
460                   oid = OID(headptr);
461                   version = headptr->version;
462                   GETSIZE(tmpsize, headptr);
463                   ptr += sizeof(objheader_t) + tmpsize;
464                 }
465                 
466                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
467
468                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
469                         /* Save the oids not found and number of oids not found for later use */
470                         oidnotfound[objnotfound] = oid;
471                         objnotfound++;
472                 } else { /* If Obj found in machine (i.e. has not moved) */
473                         /* Check if Obj is locked by any previous transaction */
474                         if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {           
475                                 if (version == ((objheader_t *)mobj)->version) {      /* If not locked then match versions */
476                                         v_matchlock++;
477                                 } else {/* If versions don't match ...HARD ABORT */
478                                         v_nomatch++;
479                                         /* Send TRANS_DISAGREE to Coordinator */
480                                         control = TRANS_DISAGREE;
481                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
482                                                 perror("Error in sending control to the Coordinator\n");
483                                                 return 0;
484                                         }
485                                         return control;
486                                 }
487                         } else {/* If Obj is not locked then lock object */
488                                 STATUS(((objheader_t *)mobj)) |= LOCK;
489                                
490                                 /*TESTING Add random wait to make transactions run for a long time such that
491                                  * we can test for soft abort case */
492                         
493                                 //randomdelay();
494
495                                 /* Save all object oids that are locked on this machine during this transaction request call */
496                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
497                                 objlocked++;
498                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
499                                         v_matchnolock++;
500                                 } else { /* If versions don't match ...HARD ABORT */
501                                         v_nomatch++;
502                                         control = TRANS_DISAGREE;
503                                         /* Send TRANS_DISAGREE to Coordinator */
504                                         if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
505                                                 perror("Error in sending control to the Coordinator\n");
506                                                 return 0;
507                                         }
508                                         if (objlocked > 0) {
509                                                 STATUS(((objheader_t *)mobj)) &= ~(LOCK);
510                                                 free(oidlocked);
511                                         }
512                                         return control;
513                                 }
514                         }
515                 }
516         }
517         
518         /* Decide what control message to send to Coordinator */
519         if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
520                                         modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
521                 printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__);
522                 return 0;
523         }
524         
525         return val;
526
527 }
528 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
529  * to send to Coordinator based on the votes of oids involved in the transaction */
530 int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock, 
531                 int *v_nomatch, int *objnotfound, int *objlocked, void *modptr, 
532                 unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
533         int val;
534         char control = 0;
535         /* Condition to send TRANS_AGREE */
536         if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
537                 control = TRANS_AGREE;
538                 /* Send control message */
539                 if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
540                         perror("Error in sending control to Coordinator\n");
541                         return 0;
542                 }
543         }
544         /* Condition to send TRANS_SOFT_ABORT */
545         if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
546                 control = TRANS_SOFT_ABORT;
547                 char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
548                 *((int*)&msg[1])= *(objnotfound);
549
550                 /* Send control message */
551                 if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
552                         perror("Error in sending no of objects that are not found\n");
553                         return 0;
554                 }
555                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
556                 if(*(objnotfound) != 0) { 
557                         int size = sizeof(unsigned int)* *(objnotfound);
558                         if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
559                                 perror("Error in sending objects that are not found\n");
560                                 return 0;
561                         }
562                 }
563         }
564
565         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
566          * if Participant receives a TRANS_COMMIT */
567         transinfo->objlocked = oidlocked;
568         transinfo->objnotfound = oidnotfound;
569         transinfo->modptr = modptr;
570         transinfo->numlocked = *(objlocked);
571         transinfo->numnotfound = *(objnotfound);
572         
573         return control;
574 }
575
576 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer 
577  * addresses in lookup table and also changes version number
578  * Sends an ACK back to Coordinator */
579 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
580         objheader_t *header;
581         int i = 0, offset = 0;
582         char control;
583
584         /* Process each modified object saved in the mainobject store */
585         for(i = 0; i < nummod; i++) {
586                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
587                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
588                         return 1;
589                 }
590                 /* Change reference count of older address and free space in objstr ?? */
591                 header->rcount = 0;
592
593                 /* Change ptr address in mhash table */
594                 mhashRemove(oidmod[i]);
595                 mhashInsert(oidmod[i], (((char *)modptr) + offset));
596                 {
597                   int tmpsize;
598                   GETSIZE(tmpsize,header);
599                   offset += sizeof(objheader_t) + tmpsize;
600                 }
601                 /* Update object version number */
602                 header = (objheader_t *) mhashSearch(oidmod[i]);
603                 header->version += 1; 
604         }
605
606         /* Unlock locked objects */
607         for(i = 0; i < numlocked; i++) {
608                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
609                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
610                         return 1;
611                 }
612                 STATUS(header) &= ~(LOCK);
613         }
614         //TODO Update location lookup table
615
616         /* Send ack to coordinator */
617         control = TRANS_SUCESSFUL;
618         printf("DEBUG-> TRANS_SUCESSFUL\n");
619         if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
620                 perror("Error sending ACK to coordinator\n");
621         }
622         
623         return 0;
624 }
625
626 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
627  * Looks for the objects to be prefetched in the main object store.
628  * If objects are not found then record those and if objects are found
629  * then use offset values to prefetch references to other objects */
630
631 int prefetchReq(int acceptfd) {
632   int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
633   unsigned int oid, index = 0;
634   char *ptr, buffer[PRE_BUF_SIZE];
635   void *mobj;
636   unsigned int objoid;
637   char control;
638   objheader_t * header;
639   int bytesRecvd;
640   
641   /* Repeatedly recv the oid and offset pairs sent for prefetch */
642   while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
643     count++;
644     if(length == -1)
645       break;
646     sum = 0;
647     index = sizeof(unsigned int); // Index starts with sizeof  unsigned int because the 
648     // first 4 bytes are saved to send the
649     // size of the buffer (that is computed at the end of the loop)
650     bytesRecvd = 0;
651     do {
652       bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
653                          sizeof(unsigned int) - bytesRecvd, 0);
654     } while (bytesRecvd < sizeof(unsigned int));
655     numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
656     N = numoffset * sizeof(short);
657     short offset[numoffset];
658     ptr = (char *)&offset;
659     /* Recv the offset values per oid */ 
660     do {
661       n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); 
662       sum += n; 
663     } while(sum < N && n != 0); 
664     
665     /* Process each oid */
666     if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
667       /* Save the oids not found in buffer for later use */
668       *(buffer + index) = OBJECT_NOT_FOUND;
669       index += sizeof(char);
670       memcpy(buffer+index, &oid, sizeof(unsigned int));
671       index += sizeof(unsigned int);
672     } else { /* If Obj found in machine (i.e. has not moved) */
673       /* send the oid, it's size, it's header and data */
674       header = mobj;
675       GETSIZE(size, header);
676       size += sizeof(objheader_t);
677       *(buffer + index) = OBJECT_FOUND;
678       index += sizeof(char);
679       memcpy(buffer+index, &oid, sizeof(unsigned int));
680       index += sizeof(unsigned int);
681       memcpy(buffer+index, &size, sizeof(int));
682       index += sizeof(int);
683       memcpy(buffer + index, header, size);
684       index += size;
685       /* Calculate the oid corresponding to the offset value */
686       for(i = 0 ; i< numoffset ; i++) {
687         objoid = *((int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
688         if((header = mhashSearch(objoid)) == NULL) {
689           /* Obj not found, send oid */
690           *(buffer + index) = OBJECT_NOT_FOUND;
691           index += sizeof(char);
692           memcpy(buffer+index, &oid, sizeof(unsigned int));
693           index += sizeof(unsigned int);
694           break;
695         } else {/* Obj Found */
696           /* send the oid, it's size, it's header and data */
697           GETSIZE(size, header);
698           size+=sizeof(objheader_t);
699           *(buffer + index) = OBJECT_FOUND;
700           index += sizeof(char);
701           memcpy(buffer+index, &oid, sizeof(unsigned int));
702           index += sizeof(unsigned int);
703           memcpy(buffer+index, &size, sizeof(int));
704           index += sizeof(int);
705           memcpy(buffer + index, header, size);
706           index += size;
707           continue;
708         }
709       }
710     }
711     /* Check for overflow in the buffer */
712     if (index >= PRE_BUF_SIZE) {
713       printf("Char buffer is overflowing\n");
714       return 1;
715     }
716     /* Send Prefetch response control message only once*/
717     if(count == 1) {
718       control = TRANS_PREFETCH_RESPONSE;
719       if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
720         perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
721         return 1;
722       }
723     }
724     
725     /* Add the buffer size into buffer as a parameter */
726     *((unsigned int *)buffer)=index;
727     /* Send the entire buffer with its size and oids found and not found */
728     if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) {
729       perror("Error sending oids found\n");
730       return 1;
731     }
732   }
733   return 0;
734 }
735