59e4aea609f7bb82232e604fcec93bcb4f4e202e
[IRC.git] / Robust / src / Runtime / DSTM / interface_recovery / dstmserver.c
1 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
2  * Participant => Machines that host the objects involved in a transaction commit */
3
4 #include <netinet/tcp.h>
5 #include <ip.h>
6 #include "dstm.h"
7 #include "mlookup.h"
8 #include "llookup.h"
9 #include "threadnotify.h"
10 #include "prefetch.h"
11 #include <sched.h>
12 #ifdef COMPILER
13 #include "thread.h"
14 #endif
15 #include "gCollect.h"
16
17 #ifdef RECOVERY
18 #include <unistd.h>
19 #include <signal.h>
20 #include "tlookup.h"
21 #endif
22
23 #define BACKLOG 10 //max pending connections
24 #define RECEIVE_BUFFER_SIZE 2048
25
26 extern int classsize[];
27 extern int numHostsInSystem;
28 extern pthread_mutex_t notifymutex;
29
30 extern unsigned int myIpAddr;
31 extern unsigned int *hostIpAddrs;
32
33 #ifdef RECOVERY
34 extern unsigned int *locateObjHosts;
35 extern int *liveHosts;
36 extern int liveHostsValid;
37 extern int numLiveHostsInSystem;
38 int clearNotifyListFlag;
39 #endif
40
41 objstr_t *mainobjstore;
42 pthread_mutex_t mainobjstore_mutex;
43 pthread_mutex_t lockObjHeader;
44 pthread_mutex_t clearNotifyList_mutex;
45 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
46
47 sockPoolHashTable_t *transPResponseSocketPool;
48 extern sockPoolHashTable_t *transRequestSockPool;
49 extern sockPoolHashTable_t *transReadSockPool;
50
51 int failFlag = 0; //debug
52
53 #ifdef RECOVERY
54 /******************************
55  * Global variables for Paxos
56  ******************************/
57 extern int n_a;
58 extern unsigned int v_a;
59 extern int n_h;
60 extern int my_n;
61 extern int leader;
62 extern int paxosRound;
63 /* This function initializes the main objects store and creates the
64  * global machine and location lookup table */
65 #endif
66
67 int dstmInit(void) {
68   mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
69   /* Initialize attribute for mutex */
70   pthread_mutexattr_init(&mainobjstore_mutex_attr);
71   pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
72   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
73   pthread_mutex_init(&lockObjHeader,NULL);
74
75 #ifdef RECOVERY
76         pthread_mutex_init(&liveHosts_mutex, NULL);
77         pthread_mutex_init(&leaderFixing_mutex, NULL);
78   pthread_mutex_init(&clearNotifyList_mutex,NULL);
79 #endif
80
81   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
82     return 1;             //failure
83
84   if (lhashCreate(HASH_SIZE, LOADFACTOR))
85     return 1;             //failure
86
87 #ifdef RECOVERY
88   if (thashCreate(THASH_SIZE, LOADFACTOR))
89     return 1;
90 #endif
91
92   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
93     return 1;             //failure
94
95   //Initialize socket pool
96   if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, DEFAULTSOCKPOOLSIZE)) == NULL) {
97     printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
98     return 0;
99   }
100
101   return 0;
102 }
103
104 int startlistening() {
105   int listenfd;
106   struct sockaddr_in my_addr;
107   socklen_t addrlength = sizeof(struct sockaddr);
108   int setsockflag=1;
109
110   listenfd = socket(AF_INET, SOCK_STREAM, 0);
111   if (listenfd == -1) {
112     perror("socket");
113     exit(1);
114   }
115
116   if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof (setsockflag)) < 0) {
117     perror("socket");
118     exit(1);
119   }
120 #ifdef MAC
121   if (setsockopt(listenfd, SOL_SOCKET, SO_NOSIGPIPE, &setsockflag, sizeof (setsockflag)) < 0) {
122     perror("socket");
123     exit(1);
124   }
125 #endif
126
127   my_addr.sin_family = AF_INET;
128   my_addr.sin_port = htons(LISTEN_PORT);
129   my_addr.sin_addr.s_addr = INADDR_ANY;
130   memset(&(my_addr.sin_zero), '\0', 8);
131
132   if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1) {
133     perror("bind");
134     exit(1);
135   }
136
137   if (listen(listenfd, BACKLOG) == -1) {
138     perror("listen");
139     exit(1);
140   }
141   return listenfd;
142 }
143
144 /* This function starts the thread to listen on a socket
145  * for tranaction calls */
146 void *dstmListen(void *lfd) {
147   int listenfd=(int)lfd;
148   int acceptfd;
149   struct sockaddr_in client_addr;
150   socklen_t addrlength = sizeof(struct sockaddr);
151   pthread_t thread_dstm_accept;
152
153 #ifdef RECOVERY
154   int firsttime = 1;
155   pthread_t thread_dstm_asking;
156 #endif
157 #ifdef DEBUG
158   printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
159 #endif
160   while(1) {
161     int retval;
162     int flag=1;
163     acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
164
165 #ifdef RECOVERY
166     if(firsttime) {
167       do {
168         retval = pthread_create(&thread_dstm_asking, NULL, startAsking, NULL);
169       }while(retval!=0);
170       firsttime=0;
171       pthread_detach(thread_dstm_asking);
172     }
173 #endif
174 #ifdef debug
175     printf("%s -> fd accepted\n",__func__);
176 #endif
177
178     setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
179     do {
180         retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
181     } while(retval!=0);
182     pthread_detach(thread_dstm_accept);
183   }
184 }
185
186 #ifdef RECOVERY
187 void* startAsking()
188 {
189   unsigned int deadMachineIndex = -1;
190   int i;
191   int validHost;
192   int *socklist;
193   int sd;
194 #ifdef DEBUG
195   printf("%s -> Entering\n",__func__);
196 #endif
197
198     socklist = (int*) calloc(numHostsInSystem,sizeof(int)); 
199
200     for(i = 0; i< numHostsInSystem;i++) { // for 1
201         if((sd = getSockWithLock(transPResponseSocketPool,hostIpAddrs[i])) < 0) {
202           printf("%s -> Cannot create socket connection to [%s]\n",__func__,midtoIPString(hostIpAddrs[i]));
203           socklist[i] = -1;
204         }
205         else { // else 1
206           socklist[i] = sd;
207         }   // end of else 1
208     }
209   
210     while(1) {
211
212      deadMachineIndex = checkIfAnyMachineDead(socklist);
213
214       // free socket of dead machine
215       if(deadMachineIndex >= 0) { // if 2
216 #ifdef DEBUG
217         printf("%s -> Dead Machine : %s\n",__func__, midtoIPString(hostIpAddrs[deadMachineIndex]));
218 #endif
219         restoreDuplicationState(hostIpAddrs[deadMachineIndex]);
220         freeSockWithLock(transPResponseSocketPool, hostIpAddrs[deadMachineIndex], socklist[deadMachineIndex]);
221         socklist[deadMachineIndex] = -1;
222       } // end of if 2
223     } // end of while 1
224 #ifdef DEBUG
225    printf("%s -> Exiting\n",__func__);
226 #endif
227 }
228
229
230 unsigned int checkIfAnyMachineDead(int* socklist)
231 {
232   int timeout = 0;
233   int i;
234   char control = RESPOND_LIVE;
235   char response;
236 #ifdef DEBUG
237   printf("%s -> Entering\n",__func__);
238 #endif
239   
240   while(1){
241     for(i = 0; i< numHostsInSystem;i++) {
242 #ifdef DEBUG
243       printf("%s -> socklist[%d] = %d\n",__func__,i,socklist[i]);
244 #endif
245       if(socklist[i] > 0) {
246         send_data(socklist[i], &control,sizeof(char));
247
248         if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
249           // if machine is dead, returns index of socket
250 #ifdef DEBUG
251           printf("%s -> Machine dead detecteed\n",__func__);
252 #endif
253           return i;
254         }
255         else {
256           // machine responded
257           if(response != LIVE) {
258 #ifdef DEBUG
259             printf("%s -> Machine dead detected\n",__func__);
260 #endif
261             return i;
262           }
263         } // end else
264       }// end if(socklist[i]
265     } // end for()
266
267     clearDeadThreadsNotification();
268
269     sleep(numLiveHostsInSystem);  // wait for seconds for next checking
270   } // end while(1)
271 }
272 #endif
273
274
275 /* This function accepts a new connection request, decodes the control message in the connection
276  * and accordingly calls other functions to process new requests */
277 void *dstmAccept(void *acceptfd) {
278   int val, retval, size, sum, sockid, sd = 0;
279   unsigned int oid;
280   char *buffer;
281         char control,ctrl, response;
282         char *ptr;
283         void *srcObj;
284         void *dupeptr;
285         int i, tempsize;
286         objheader_t *h;
287         trans_commit_data_t transinfo;
288   unsigned short objType, *versionarry, version;
289         unsigned int *oidarry, numoid, mid, threadid;
290   int n, v;
291   unsigned int transIDreceived;
292   char decision;
293   struct sockaddr_in remoteAddr;
294
295 #ifdef DEBUG
296         printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
297 #endif
298         /* Receive control messages from other machines */
299         while(1) {
300                 int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
301     dupeptr = NULL;
302
303                 if (ret==0)
304                         break;
305                 if (ret==-1) {
306 #ifdef DEBUG
307                         printf("DEBUG -> RECV Error!.. retrying\n");
308 #endif
309         //              exit(0);
310                         break;
311                 }
312 #ifdef DEBUG
313                 printf("%s-> dstmAccept control = %d\n", __func__, (int)control);
314 #endif
315                 switch(control) {
316                         case READ_REQUEST:
317 #ifdef DEBUG
318         printf("control -> READ_REQUEST\n");
319 #endif
320                                 /* Read oid requested and search if available */
321                                 recv_data((int)acceptfd, &oid, sizeof(unsigned int));
322                                 while((srcObj = mhashSearch(oid)) == NULL) {
323                                         int ret;
324                                         if((ret = sched_yield()) != 0) {
325                                                 printf("%s(): error no %d in thread yield\n", __func__, errno);
326                                         }
327                                 }
328                                 h = (objheader_t *) srcObj;
329                                 GETSIZE(size, h);
330                                 size += sizeof(objheader_t);
331                                 sockid = (int) acceptfd;
332                                 if (h == NULL) {
333                                         ctrl = OBJECT_NOT_FOUND;
334                                         send_data(sockid, &ctrl, sizeof(char));
335                                 } else {
336                                         // Type
337                                         char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
338                                         *((int *)&msg[1])=size;
339                                         send_data(sockid, &msg, sizeof(msg));
340                                         send_data(sockid, h, size);
341                                 }
342                                 break;
343
344                         case READ_MULT_REQUEST:
345                                 break;
346
347                         case MOVE_REQUEST:
348                                 break;
349
350                         case MOVE_MULT_REQUEST:
351                                 break;
352
353                         case TRANS_REQUEST:
354 #ifdef DEBUG
355         printf("control -> TRANS_REQUEST\n");
356 #endif
357                                 /* Read transaction request */
358                                 transinfo.objlocked = NULL;
359                                 transinfo.objnotfound = NULL;
360                                 transinfo.modptr = NULL;
361                                 transinfo.numlocked = 0;
362                                 transinfo.numnotfound = 0;
363                                 if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
364                                         printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
365                                         pthread_exit(NULL);
366                                 }
367                                 break;
368 #ifdef RECOVERY
369       case ASK_COMMIT :
370
371         recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int));
372
373         decision = checkDecision(transIDreceived);
374
375         send_data((int)acceptfd,&decision,sizeof(char));
376
377         break;
378 #endif
379                         case TRANS_PREFETCH:
380 #ifdef DEBUG
381         printf("control -> TRANS_PREFETCH\n");
382 #endif
383 #ifdef RANGEPREFETCH
384                                 if((val = rangePrefetchReq((int)acceptfd)) != 0) {
385                                         printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
386                                         break;
387                                 }
388 #else
389                                 if((val = prefetchReq((int)acceptfd)) != 0) {
390                                         printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
391                                         break;
392                                 }
393 #endif
394                                 break;
395
396                         case TRANS_PREFETCH_RESPONSE:
397 #ifdef DEBUG
398                 printf("control -> TRANS_PREFETCH_RESPONSE\n");
399 #endif
400 #ifdef RANGEPREFETCH
401                                 if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
402                                         printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
403                                         break;
404                                 }
405 #else
406                                 if((val = getPrefetchResponse((int) acceptfd)) != 0) {
407                                         printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
408                                         break;
409                                 }
410 #endif
411                                 break;
412
413                         case START_REMOTE_THREAD:
414 #ifdef DEBUG
415         printf("control -> START_REMOTE_THREAD\n");
416 #endif
417                                 recv_data((int)acceptfd, &oid, sizeof(unsigned int));
418                                 objType = getObjType(oid);
419                                 startDSMthread(oid, objType);
420                                 break;
421
422       case THREAD_NOTIFY_REQUEST:
423 #ifdef DEBUG
424         printf("control -> THREAD_NOTIFY_REQUEST FD : %d\n",acceptfd);
425 #endif
426         numoid = 0;
427                                 recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
428       
429                                 size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
430                                 if((buffer = calloc(1,size)) == NULL) {
431                                         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
432                                         pthread_exit(NULL);
433                                 }
434                 
435                                 recv_data((int)acceptfd, buffer, size);
436
437                                 oidarry = calloc(numoid, sizeof(unsigned int));
438                                 memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
439                                 size = sizeof(unsigned int) * numoid;
440                                 versionarry = calloc(numoid, sizeof(unsigned short));
441                                 memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
442                                 size += sizeof(unsigned short) * numoid;
443                                 mid = *((unsigned int *)(buffer+size));
444                                 size += sizeof(unsigned int);
445                                 threadid = *((unsigned int *)(buffer+size));
446                                 processReqNotify(numoid, oidarry, versionarry, mid, threadid);
447                                 free(buffer);
448
449                                 break;
450
451                         case THREAD_NOTIFY_RESPONSE:
452 #ifdef DEBUG
453         printf("control -> THREAD_NOTIFY_RESPONSE\n");
454 #endif
455                                 size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
456                                 if((buffer = calloc(1,size)) == NULL) {
457                                         printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
458                                         pthread_exit(NULL);
459                                 }
460
461                                 recv_data((int)acceptfd, buffer, size);
462
463
464                                 oid = *((unsigned int *)buffer);
465                                 size = sizeof(unsigned int);
466                                 version = *((unsigned short *)(buffer+size));
467                                 size += sizeof(unsigned short);
468                                 threadid = *((unsigned int *)(buffer+size));
469                                 threadNotify(oid,version,threadid);
470                                 free(buffer);
471                                 break;
472 #ifdef RECOVERY
473       case CLEAR_NOTIFY_LIST:
474 #ifdef DEBUG
475         printf("control -> CLEAR_NOTIFY_LIST\n");
476 #endif  
477         size = sizeof(unsigned int);
478         if((buffer = calloc(1,size)) == NULL) {
479           printf("%s() Caclloc error at CLEAR_NOTIFY_LIST\n");
480           pthread_exit(NULL);
481         }
482
483         recv_data((int)acceptfd,buffer, size);
484
485         oid = *((unsigned int *)buffer);
486
487         pthread_mutex_lock(&clearNotifyList_mutex);
488         if(clearNotifyListFlag == 0) {
489           clearNotifyListFlag = 1;
490           pthread_mutex_unlock(&clearNotifyList_mutex);
491           clearNotifyList(oid);
492         }
493         else {
494           pthread_mutex_unlock(&clearNotifyList_mutex);
495         }
496         free(buffer);
497         break;
498 #endif
499                         case CLOSE_CONNECTION:
500 #ifdef DEBUG
501         printf("control -> CLOSE_CONNECTION\n");
502 #endif
503                                 goto closeconnection;
504
505 #ifdef RECOVERY
506                         case RESPOND_LIVE:
507 #ifdef DEBUG
508         printf("control -> RESPOND_LIVE\n");
509 #endif
510                                 liveHostsValid = 0;
511                                 ctrl = LIVE;
512                                 send_data((int)acceptfd, &ctrl, sizeof(ctrl));
513 #ifdef DEBUG
514                                 printf("%s (RESPOND_LIVE)-> Sending LIVE!\n", __func__);
515 #endif
516                                 break;
517 #endif
518 #ifdef RECOVERY
519                         case REMOTE_RESTORE_DUPLICATED_STATE:
520 #ifdef DEBUG
521         printf("control -> REMOTE_RESTORE_DUPLICATED_STATE\n");
522 #endif
523                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
524                                 if(!liveHosts[findHost(mid)]) {
525 #ifdef DEBUG
526           printf("%s (REMOTE_RESTORE_DUPLICATED_STATE) -> already fixed\n",__func__);
527 #endif
528                                         break;
529         }
530                                 pthread_mutex_lock(&leaderFixing_mutex);
531                                 if(!leaderFixing) {
532                                         leaderFixing = 1;
533                                         pthread_mutex_unlock(&leaderFixing_mutex);
534                                         // begin fixing
535                                         updateLiveHosts();
536                                         duplicateLostObjects(mid);
537                                 if(updateLiveHostsCommit() != 0) {
538                                         printf("error updateLiveHostsCommit()\n");
539                                         exit(1);
540                                 }
541
542         // finish fixing
543                                 pthread_mutex_lock(&leaderFixing_mutex);
544                                 leaderFixing = 0;
545                                 pthread_mutex_unlock(&leaderFixing_mutex);
546                                 }
547                                 else {
548                                         pthread_mutex_unlock(&leaderFixing_mutex);
549 #ifdef DEBUG
550           printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
551 #endif
552           sleep(WAIT_TIME);
553                                 }
554                                 break;
555 #endif
556 #ifdef RECOVERY
557                         case UPDATE_LIVE_HOSTS:
558 #ifdef DEBUG
559         printf("control -> UPDATE_LIVE_HOSTS\n");
560 #endif
561                                 // copy back
562                                 pthread_mutex_lock(&liveHosts_mutex);
563                           recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
564                                 recv_data((int)acceptfd, locateObjHosts, sizeof(unsigned int)*numHostsInSystem*2);
565                                 pthread_mutex_unlock(&liveHosts_mutex);
566                                 liveHostsValid = 1;
567                                 numLiveHostsInSystem = getNumLiveHostsInSystem();
568 #ifdef DEBUG
569                                 printHostsStatus();
570                           printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);      
571 #endif
572                                 //exit(0);
573                                 break;
574 #endif
575
576 #ifdef RECOVERY
577                         case DUPLICATE_ORIGINAL:
578 #ifdef DEBUG
579         printf("control -> DUPLICATE_ORIGINAL\n");
580                                 printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);  
581 #endif
582                                 //object store stuffffff
583                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
584                                 tempsize = mhashGetDuplicate(&dupeptr, 0);
585
586                                 //send control and dupes after
587                                 ctrl = RECEIVE_DUPES;
588
589         if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
590           perror("ORIGINAL : ");
591           exit(0);
592         }
593
594         bzero(&remoteAddr, sizeof(remoteAddr));
595         remoteAddr.sin_family = AF_INET;
596         remoteAddr.sin_port = htons(LISTEN_PORT);
597         remoteAddr.sin_addr.s_addr = htonl(mid);
598
599         if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
600           printf("ORIGINAL ERROR : %s\n",strerror(errno));
601           exit(0);
602         }
603         else {
604                                 send_data(sd, &ctrl, sizeof(char));
605                                 send_data(sd, dupeptr, tempsize);
606
607                                 recv_data(sd, &response, sizeof(char));
608 #ifdef DEBUG
609           printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
610 #endif
611                                 if(response != DUPLICATION_COMPLETE) {
612 #ifdef DEBUG
613            printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
614 #endif
615                                   //fail message
616            exit(0);
617                                 }
618
619           close(sd);
620         }
621         free(dupeptr);
622
623         ctrl = DUPLICATION_COMPLETE;
624                                 send_data((int)acceptfd, &ctrl, sizeof(char));
625 #ifndef DEBUG
626                                 printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);       
627 #endif
628                                 break;
629
630                         case DUPLICATE_BACKUP:
631 #ifndef DEBUG
632         printf("control -> DUPLICATE_BACKUP\n");
633                                 printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
634 #endif
635                                 //object store stuffffff
636                                 recv_data((int)acceptfd, &mid, sizeof(unsigned int));
637
638
639                                 tempsize = mhashGetDuplicate(&dupeptr, 1);
640
641                                 //send control and dupes after
642                                 ctrl = RECEIVE_DUPES;
643
644         if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
645           perror("BACKUP : ");
646           exit(0);
647         }
648
649          bzero(&remoteAddr, sizeof(remoteAddr));                                       
650          remoteAddr.sin_family = AF_INET;                                              
651          remoteAddr.sin_port = htons(LISTEN_PORT);                                     
652          remoteAddr.sin_addr.s_addr = htonl(mid);                                      
653                                                                                        
654          if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {       
655            printf("BACKUP ERROR : %s\n",strerror(errno));
656            exit(0);
657          }                                                                             
658          else {                                                                        
659           send_data(sd, &ctrl, sizeof(char));
660                                 send_data(sd, dupeptr, tempsize);
661           
662           recv_data(sd, &response, sizeof(char));
663 #ifdef DEBUG
664           printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
665 #endif
666                                 if(response != DUPLICATION_COMPLETE) {
667 #ifndef DEBUG
668             printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
669 #endif
670             exit(0);
671           }
672
673           close(sd);
674          }
675
676         free(dupeptr);
677
678                                 ctrl = DUPLICATION_COMPLETE;
679                                 send_data((int)acceptfd, &ctrl, sizeof(char));
680 #ifndef DEBUG
681                                 printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__); 
682 #endif
683                                 
684                                 break;
685
686                         case RECEIVE_DUPES:
687 #ifndef DEBUG
688         printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
689 #endif
690                                 if((readDuplicateObjs((int)acceptfd)) != 0) {
691                                         printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
692                                         pthread_exit(NULL);
693                                 }
694
695                                 ctrl = DUPLICATION_COMPLETE;
696                                 send_data((int)acceptfd, &ctrl, sizeof(char));
697 #ifndef DEBUG
698         printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
699 #endif
700                                 break;
701 #endif
702 #ifdef RECOVERY
703                         case PAXOS_PREPARE:
704 #ifdef DEBUG
705         printf("control -> PAXOS_PREPARE\n");
706 #endif
707                                 recv_data((int)acceptfd, &val, sizeof(int));
708                                 if (val <= n_h) {
709                                         control = PAXOS_PREPARE_REJECT;
710                                         send_data((int)acceptfd, &control, sizeof(char));
711                                 }
712                                 else {
713                                         n_h = val;
714                                         control = PAXOS_PREPARE_OK;
715                     
716                                         send_data((int)acceptfd, &control, sizeof(char));
717                                         send_data((int)acceptfd, &n_a, sizeof(int));
718                                         send_data((int)acceptfd, &v_a, sizeof(int));
719                                 }
720                                 break;
721
722                         case PAXOS_ACCEPT:
723 #ifdef DEBUG
724         printf("control -> PAXOS_ACCEPT\n");
725 #endif
726                                 recv_data((int)acceptfd, &n, sizeof(int));
727                                 recv_data((int)acceptfd, &v, sizeof(int));
728                                 if (n < n_h) {
729                                         control = PAXOS_ACCEPT_REJECT;
730                                         send_data((int)acceptfd, &control, sizeof(char));
731                                 }
732                                 else {
733                                         n_a = n;
734                                         v_a = v;
735                                         n_h = n;
736                                         control = PAXOS_ACCEPT_OK;
737                                         send_data((int)acceptfd, &control, sizeof(char));
738                                 }
739                                 break;
740
741                         case PAXOS_LEARN:
742 #ifdef DEBUG
743         printf("control -> PAXOS_LEARN\n");
744 #endif
745                                 recv_data((int)acceptfd, &v, sizeof(int));
746                                 leader = v_a;
747                                 paxosRound++;
748 #ifdef DEBUG
749                                 printf("%s (PAXOS_LEARN)-> This is my leader!: [%s]\n", __func__, midtoIPString(leader));
750 #endif
751                                 break;
752
753                         case DELETE_LEADER:
754 #ifdef DEBUG
755         printf("control -> DELETE_LEADER\n");
756 #endif
757                                 v_a = 0;
758                                 break;
759 #endif
760                         default:
761                                 printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
762                 }
763         }
764 #ifdef DEBUG
765         printf("%s-> Exiting\n", __func__); fflush(stdout);
766 #endif
767 closeconnection:
768         /* Close connection */
769         if (close((int)acceptfd) == -1)
770                 perror("close");
771         pthread_exit(NULL);
772 }
773
774 int readDuplicateObjs(int acceptfd) {
775         int numoid, i, size, tmpsize;
776         unsigned int oid;
777         void *dupeptr, *ptrcreate, *ptr;
778         objheader_t *header;
779
780 #ifdef DEBUG
781         printf("%s-> Start\n", __func__);
782 #endif
783         recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
784         recv_data((int)acceptfd, &size, sizeof(int));   
785         // do i need array of oids?
786         // answer: no! now get to work
787         if(numoid != 0) {
788                 if ((dupeptr = calloc(1, size)) == NULL) {
789                         printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
790                         return 1;
791                 }
792
793                 recv_data((int)acceptfd, dupeptr, size);
794                 ptr = dupeptr;
795                 for(i = 0; i < numoid; i++) {
796                         header = (objheader_t *)ptr;
797                         oid = OID(header);
798                         GETSIZE(tmpsize, header);
799                         tmpsize += sizeof(objheader_t);
800
801 #ifdef DEBUG
802                         printf("%s-> oid being received/backed:%u, version:%d, type:%d\n", __func__, oid, header->version, TYPE(header));
803                         printf("STATUSPTR(header):%u, STATUS:%d\n", STATUSPTR(header), STATUS(header));
804 #endif
805
806       if(mhashSearch(oid) != NULL) {
807 #ifdef DEBUG
808         printf("%s -> oid : %d is already in there\n",__func__,oid);
809 #endif
810
811         if(header->notifylist != NULL) {
812           unsigned int *listSize = (ptr + tmpsize);
813           tmpsize += sizeof(unsigned int);
814           tmpsize += sizeof(threadlist_t) * (*listSize);
815         }
816       }
817       else {
818                         pthread_mutex_lock(&mainobjstore_mutex);
819                         if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
820                                 printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
821                                 pthread_mutex_unlock(&mainobjstore_mutex);
822                                   return 1;
823                         }
824                         pthread_mutex_unlock(&mainobjstore_mutex);
825               memcpy(ptrcreate, header, tmpsize);
826
827         objheader_t* oPtr = (objheader_t*)ptrcreate;
828
829         if(oPtr->notifylist != NULL) {
830           oPtr->notifylist = NULL;  // reset for new list
831           threadlist_t *listNode;
832           unsigned int* listSize = (ptr + tmpsize);  // get number of notifylist
833           unsigned int j;
834
835           tmpsize += sizeof(unsigned int);   // skip number of notifylist 
836           listNode = (threadlist_t*)(ptr + tmpsize); // get first element of address
837           for(j = 0; j< *listSize; j++) {      // retreive all threadlist
838             oPtr->notifylist = insNode(oPtr->notifylist,listNode[j].threadid,listNode[j].mid);
839           
840           }
841           tmpsize += sizeof(threadlist_t) * (*listSize);
842
843         }
844                 mhashInsert(oid, ptrcreate);
845       }
846                 ptr += tmpsize;
847                 }
848 #ifdef DEBUG
849                 printf("%s-> End\n", __func__);
850 #endif
851     free(dupeptr);
852                 return 0;
853         }
854         else {
855 #ifdef DEBUG
856                 printf("%s-> No objects duplicated\n", __func__);
857 #endif
858                 return 0;
859         }
860 }
861
862 /* This function reads the information available in a transaction request
863  * and makes a function call to process the request */
864 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
865   char *ptr;
866   void *modptr;
867   unsigned int *oidmod, oid;
868   fixed_data_t fixed;
869   objheader_t *headaddr;
870   int sum, i, size, n, val;
871   int timeout;
872
873   oidmod = NULL;
874 #ifdef DEBUG
875         printf("%s-> Entering\n", __func__);
876 #endif
877
878   /* Read fixed_data_t data structure */
879   size = sizeof(fixed) - 1;
880   ptr = (char *)&fixed;
881   fixed.control = TRANS_REQUEST;
882   timeout = recv_data((int)acceptfd, ptr+1, size);
883
884   /* Read list of mids */
885   int mcount = fixed.mcount;
886   size = mcount * sizeof(unsigned int);
887   unsigned int listmid[mcount];
888   ptr = (char *) listmid;
889   timeout = recv_data((int)acceptfd, ptr, size);
890
891   if(timeout < 0)   // coordinator failed
892     return 0;
893
894   /* Read oid and version tuples for those objects that are not modified in the transaction */
895   int numread = fixed.numread;
896   size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
897   char objread[size];
898   if(numread != 0) { //If pile contains more than one object to be read,
899     // keep reading all objects
900     timeout = recv_data((int)acceptfd, objread, size);
901   }
902
903   /* Read modified objects */
904   if(fixed.nummod != 0) {
905     if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
906       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
907       return 1;
908     }
909     size = fixed.sum_bytes;
910     timeout = recv_data((int)acceptfd, modptr, size);
911   }
912
913   if(timeout < 0) // coordinator failed
914     return 0;
915
916   /* Create an array of oids for modified objects */
917   oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
918   if (oidmod == NULL) {
919     printf("calloc error %s, %d\n", __FILE__, __LINE__);
920     return 1;
921   }
922   ptr = (char *) modptr;
923   for(i = 0 ; i < fixed.nummod; i++) {
924     int tmpsize;
925     headaddr = (objheader_t *) ptr;
926     oid = OID(headaddr);
927     oidmod[i] = oid;
928     GETSIZE(tmpsize, headaddr);
929     ptr += sizeof(objheader_t) + tmpsize;
930   }
931 #ifdef DEBUG
932         printf("%s-> num oid read = %d, oids modified = %d, size = %d\n", __func__, fixed.numread,  fixed.nummod, size);
933 #endif
934   /*Process the information read */
935   if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
936     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
937     /* Free resources */
938     if(oidmod != NULL) {
939       free(oidmod);
940     }
941     return 1;
942   }
943
944   /* Free resources */
945   if(oidmod != NULL) {
946     free(oidmod);
947   }
948 #ifdef DEBUG
949         printf("%s-> Exiting\n", __func__);
950 #endif
951
952   return 0;
953 }
954
955 /* This function processes the Coordinator's transaction request using "handleTransReq"
956  * function and sends a reply to the co-ordinator.
957  * Following this it also receives a new control message from the co-ordinator and processes this message*/
958 int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
959                      unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
960
961   char control, sendctrl, retval;
962   
963   objheader_t *tmp_header;
964   void *header;
965   int i = 0, val;
966   unsigned int transID;
967 #ifdef DEBUG
968         printf("%s-> Entering\n", __func__);
969 #endif
970
971   /* receives transaction id */
972   recv_data((int)acceptfd, &transID, sizeof(unsigned int));
973
974   /* Send reply to the Coordinator */
975   if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
976     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
977           printf("DEBUG-> Exiting processClientReq, line = %d\n", __LINE__);
978     return 1;
979   }
980
981         int timeout = recv_data((int)acceptfd, &control, sizeof(char));
982         /* Process the new control message */
983 #ifdef DEBUG
984   printf("%s -> timeout = %d   control = %d\n",__func__,timeout,control); 
985 #endif
986   
987 #ifdef RECOVERY
988   if(timeout < 0) {  // timeout. failed to receiving data from coordinator
989 #ifdef DEBUG
990     printf("%s -> timeout!! assumes coordinator is dead\n",__func__);
991 #endif
992     control = receiveDecisionFromBackup(transID,fixed->mcount,listmid);
993 #ifdef DEBUG
994     printf("%s -> received Decision %d\n",__func__,control);
995 #endif
996   }    
997   
998   /* insert received control into thash for another transaction*/
999   thashInsert(transID, control);
1000 #endif
1001   switch(control) {
1002                 case TRANS_ABORT:
1003                         if (fixed->nummod > 0)
1004                                 free(modptr);
1005                         /* Unlock objects that was locked due to this transaction */
1006                         int useWriteUnlock = 0;
1007                         for(i = 0; i< transinfo->numlocked; i++) {
1008                                 if(transinfo->objlocked[i] == -1) {
1009                                         useWriteUnlock = 1;
1010                                         continue;
1011                                 }
1012                                 if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
1013                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
1014                                         printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1015                                         return 1;
1016                                 }
1017                                 if(useWriteUnlock) {
1018                                         write_unlock(STATUSPTR(header));
1019                                 } else {
1020                                         read_unlock(STATUSPTR(header));
1021                                 }
1022                         }
1023                         break;
1024
1025                 case TRANS_COMMIT:
1026                         /* Invoke the transCommit process() */
1027                         if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
1028                                 printf("Error: In transCommitProcess() %s, %d\n", __FILE__, __LINE__);
1029                                 /* Free memory */
1030                                 if (transinfo->objlocked != NULL) {
1031                                         free(transinfo->objlocked);
1032                                 }
1033                                 if (transinfo->objnotfound != NULL) {
1034                                         free(transinfo->objnotfound);
1035                                 }
1036                                 printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1037                                 return 1;
1038                         }
1039
1040
1041                         break;
1042
1043                 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
1044                         break;
1045
1046                 default:
1047                         printf("Error: No response to TRANS_AGREE OR DISAGREE protocol %s, %d\n", __FILE__, __LINE__);
1048                         //TODO Use fixed.trans_id  TID since Client may have died
1049                         break;
1050         }
1051
1052   /* Free memory */
1053   if (transinfo->objlocked != NULL) {
1054     free(transinfo->objlocked);
1055   }
1056   if (transinfo->objnotfound != NULL) {
1057     free(transinfo->objnotfound);
1058   }
1059 #ifdef DEBUG
1060         printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
1061 #endif
1062
1063   return 0;
1064 }
1065
1066 #ifdef RECOVERY
1067 char checkDecision(unsigned int transID) 
1068 {
1069 #ifdef DEBUG
1070   printf("%s -> transID :  %u\n",__func__,transID);
1071 #endif
1072
1073   char response = thashSearch(transID);
1074
1075   if(response == 0)
1076     return -1;
1077   else
1078     return response;
1079 }
1080 #endif
1081
1082 /* This function increments counters while running a voting decision on all objects involved
1083  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
1084 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
1085   int val, i = 0, j;
1086   unsigned short version;
1087   char control = 0, *ptr;
1088   unsigned int oid;
1089   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
1090   objheader_t *headptr;
1091
1092   /* Counters and arrays to formulate decision on control message to be sent */
1093   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
1094   oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
1095   oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
1096   int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
1097   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
1098   int numBytes = 0;
1099   /* modptr points to the beginning of the object store
1100    * created at the Pariticipant.
1101    * Object store holds the modified objects involved in the transaction request */
1102   ptr = (char *) modptr;
1103
1104   /* Process each oid in the machine pile/ group per thread */
1105   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
1106     if (i < fixed->numread) { //Objs only read and not modified
1107       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
1108       incr *= i;
1109       oid = *((unsigned int *)(objread + incr));
1110       incr += sizeof(unsigned int);
1111       version = *((unsigned short *)(objread + incr));
1112 #ifdef DEBUG
1113       printf("%s -> oid : %u    version : %d\n",__func__,oid,version);
1114 #endif
1115       getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
1116                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
1117     } else {  //Objs modified
1118       if(i == fixed->numread) {
1119         oidlocked[objlocked++] = -1;
1120       }
1121       int tmpsize;
1122       headptr = (objheader_t *) ptr;
1123       oid = OID(headptr);
1124       version = headptr->version;
1125       GETSIZE(tmpsize, headptr);
1126       ptr += sizeof(objheader_t) + tmpsize;
1127
1128       getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
1129                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
1130                               &numBytes, &control, oid, version);
1131     }
1132   }
1133
1134   /* send TRANS_DISAGREE and objs*/
1135         if(v_nomatch > 0) {
1136 #ifdef CACHE
1137                 char *objs = calloc(1, numBytes);
1138                 int j, offset = 0;
1139                 for(j = 0; j<objvernotmatch; j++) {
1140                         objheader_t *header = mhashSearch(oidvernotmatch[j]);
1141                         int size = 0;
1142                         GETSIZE(size, header);
1143                         size += sizeof(objheader_t);
1144                         memcpy(objs+offset, header, size);
1145                         offset += size;
1146                 }
1147 #endif
1148                 if (objlocked > 0) {
1149                         int useWriteUnlock = 0;
1150                         for(j = 0; j < objlocked; j++) {
1151                                 if(oidlocked[j] == -1) {
1152                                         useWriteUnlock = 1;
1153                                         continue;
1154                                 }
1155                                 if((headptr = mhashSearch(oidlocked[j])) == NULL) {
1156                                         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1157                                         return 0;
1158                                 }
1159                                 if(useWriteUnlock) {
1160                                         write_unlock(STATUSPTR(headptr));
1161                                 } else {
1162                                         read_unlock(STATUSPTR(headptr));
1163                                 }
1164                         }
1165                         free(oidlocked);
1166                 }
1167     
1168
1169 #ifdef DEBUG
1170                 printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
1171 #endif
1172
1173                 send_data(acceptfd, &control, sizeof(char));
1174 #ifdef CACHE
1175                 send_data(acceptfd, &numBytes, sizeof(int));
1176                 send_data(acceptfd, objs, numBytes);
1177
1178                 transinfo->objvernotmatch = oidvernotmatch;
1179                 transinfo->numvernotmatch = objvernotmatch;
1180                 free(objs);
1181                 free(transinfo->objvernotmatch);
1182 #endif
1183                 return control;
1184   }
1185
1186   /* Decide what control message to send to Coordinator */
1187   if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
1188                                    modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
1189     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
1190     return 0;
1191   }
1192   return control;
1193 }
1194
1195 /* Update Commit info for objects that are modified */
1196 void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
1197                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
1198                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
1199                              char *control, unsigned int oid, unsigned short version) {
1200   void *mobj;
1201   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1202         //printf("version number: %d\n", version);
1203 #ifdef RECOVERY
1204   if(version == 1) {
1205                 (*v_matchnolock)++;
1206 #ifdef DEBUG
1207                 printf("%s -> *backup object* oid:%u\n", __func__,oid);
1208 #endif
1209                 return;
1210         }
1211 #endif
1212
1213         if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
1214 #ifdef DEBUG
1215                 printf("Obj not found: %s() oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1216                 fflush(stdout);
1217 #endif
1218                 /* Save the oids not found and number of oids not found for later use */
1219                 oidnotfound[*objnotfound] = oid;
1220                 (*objnotfound)++;
1221         } else {     /* If Obj found in machine (i.e. has not moved) */
1222                 /* Check if Obj is locked by any previous transaction */
1223                 if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1224 #ifdef DEBUG
1225                   printf("****%s->Trying to acquire 'remote' writelock for oid:%d, version:%d\n", __func__, oid, version);
1226                         printf("this version: %d, mlookup version: %d\n", version, ((objheader_t *)mobj)->version);
1227 #endif
1228                         if (version == ((objheader_t *)mobj)->version) { /* match versions */
1229                                 (*v_matchnolock)++;
1230                         } else { /* If versions don't match ...HARD ABORT */
1231                                 (*v_nomatch)++;
1232                                 oidvernotmatch[*objvernotmatch] = oid;
1233                                 (*objvernotmatch)++;
1234                                 int size;
1235                                 GETSIZE(size, mobj);
1236                                 size += sizeof(objheader_t);
1237                                 *numBytes += size;
1238                                 /* Send TRANS_DISAGREE to Coordinator */
1239                                 *control = TRANS_DISAGREE;
1240                                 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1241                         }
1242                         //Keep track of oid locked
1243                         oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
1244                 } else {  //we are locked
1245                         if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
1246                                 (*v_matchlock)++;
1247                         } else { /* If versions don't match ...HARD ABORT */
1248                                 (*v_nomatch)++;
1249                                 oidvernotmatch[*objvernotmatch] = oid;
1250                                 (*objvernotmatch)++;
1251                                 int size;
1252                                 GETSIZE(size, mobj);
1253                                 size += sizeof(objheader_t);
1254                                 *numBytes += size;
1255                                 *control = TRANS_DISAGREE;
1256                                 //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1257                         }
1258                 }
1259   }
1260 #ifdef DEBUG
1261         printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch);
1262 #endif
1263 }
1264
1265 /* Update Commit info for objects that are read */
1266 void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
1267                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
1268                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
1269   void *mobj;
1270   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1271   //printf("version number: %d\n", version);
1272 #ifdef DEBUG
1273   printf("%s -> Entering\n",__func__);
1274 #endif
1275 #ifdef RECOVERY
1276   if(version == 1) {
1277                 (*v_matchnolock)++;
1278                 printf("*backup object* oid:%u\n", oid);
1279                 return;
1280         }
1281 #endif
1282
1283         if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
1284 #ifdef DEBUG
1285     printf("%s -> Obj not found!\n",__func__);
1286           printf("%s -> Obj not found: oid = %d, type = %d\t\n", __func__,OID(mobj), TYPE((objheader_t *)mobj));
1287         fflush(stdout);
1288 #endif
1289     /* Save the oids not found and number of oids not found for later use */
1290     oidnotfound[*objnotfound] = oid;
1291     (*objnotfound)++;
1292   } else {     /* If Obj found in machine (i.e. has not moved) */
1293 #ifdef DEBUG
1294     printf("%s -> Obj found!!\n",__func__);
1295         printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1296           fflush(stdout);
1297 #endif
1298     
1299     /* Check if Obj is locked by any previous transaction */
1300     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
1301       if (version == ((objheader_t *)mobj)->version) { /* match versions */
1302         (*v_matchnolock)++;
1303       } else { /* If versions don't match ...HARD ABORT */
1304         (*v_nomatch)++;
1305         oidvernotmatch[(*objvernotmatch)++] = oid;
1306         int size;
1307         GETSIZE(size, mobj);
1308         size += sizeof(objheader_t);
1309         *numBytes += size;
1310       
1311         /* Send TRANS_DISAGREE to Coordinator */
1312         *control = TRANS_DISAGREE;
1313       }
1314
1315       //Keep track of oid locked
1316       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
1317     } else { /* Some other transaction has aquired a write lock on this object */
1318       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1319         (*v_matchlock)++;
1320       } else { /* If versions don't match ...HARD ABORT */
1321         (*v_nomatch)++;
1322         oidvernotmatch[*objvernotmatch] = oid;
1323         (*objvernotmatch)++;
1324         int size;
1325         GETSIZE(size, mobj);
1326         size += sizeof(objheader_t);
1327         *numBytes += size;
1328         *control = TRANS_DISAGREE;
1329       }
1330     }
1331   }
1332 #ifdef DEBUG
1333         printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch);
1334 #endif
1335 }
1336
1337 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
1338  * to send to Coordinator based on the votes of oids involved in the transaction */
1339 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
1340                        int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
1341                        unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
1342   int val;
1343   char control = 0;
1344
1345   /* Condition to send TRANS_AGREE */
1346   if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
1347     control = TRANS_AGREE;
1348     /* Send control message */
1349 #ifdef DEBUG
1350                 printf("%s -> control = %s\n", __func__,"TRANS_AGREE");
1351 #endif
1352     send_data(acceptfd, &control, sizeof(char));
1353     
1354 #ifdef DEBUG
1355                 printf("%s -> finished sending control\n",__func__);
1356 #endif
1357   }
1358   /* Condition to send TRANS_SOFT_ABORT */
1359   else if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
1360     control = TRANS_SOFT_ABORT;
1361 #ifdef DEBUG
1362                 printf("%s -> control = %s\n", __func__,"TRANS_SOFT_ABORT");
1363 #endif
1364     /* Send control message */
1365     send_data(acceptfd, &control, sizeof(char));
1366
1367     /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
1368     if(*(objnotfound) != 0) {
1369       int msg[1];
1370       msg[0] = *(objnotfound);
1371       send_data(acceptfd, &msg, sizeof(int));
1372       int size = sizeof(unsigned int)* *(objnotfound);
1373       send_data(acceptfd, oidnotfound, size);
1374     }
1375   }
1376
1377   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1378    * if Participant receives a TRANS_COMMIT */
1379   transinfo->objlocked = oidlocked;
1380   transinfo->objnotfound = oidnotfound;
1381   transinfo->modptr = modptr;
1382   transinfo->numlocked = *(objlocked);
1383   transinfo->numnotfound = *(objnotfound);
1384   return control;
1385 }
1386
1387 /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
1388  * addresses in lookup table and also changes version number
1389  * Sends an ACK back to Coordinator */
1390 int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
1391   objheader_t *header;
1392   objheader_t *newheader;
1393   int i = 0, offset = 0;
1394   char control;
1395   int tmpsize;
1396   void *ptrcreate;
1397 #ifdef DEBUG
1398         printf("DEBUG-> Entering transCommitProcess, dstmserver.c\n");
1399         printf("nummod: %d, numlocked: %d\n", nummod, numlocked);
1400 #endif
1401
1402   /* Process each modified object saved in the mainobject store */
1403   for(i = 0; i < nummod; i++) {
1404     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1405 #ifndef RECOVERY
1406       printf("Error: mhashsearch returns NULL at dstmserver.c %d\n", __LINE__);
1407                         return 1;
1408 #else
1409 #ifdef DEBUG
1410                         printf("DEBUG->*backup* i:%d, nummod:%d\n", i, nummod);
1411 #endif
1412                         header = (objheader_t *)(modptr+offset);
1413                         header->version += 1;
1414                         header->isBackup = 1;
1415 #ifdef DEBUG
1416       printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1417 #endif
1418                         GETSIZE(tmpsize, header);
1419                         tmpsize += sizeof(objheader_t);
1420                         pthread_mutex_lock(&mainobjstore_mutex);
1421                         if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
1422                                 printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1423                                 pthread_mutex_unlock(&mainobjstore_mutex);
1424                                 return 1;
1425                         }
1426                         pthread_mutex_unlock(&mainobjstore_mutex);
1427                         /* Initialize read and write locks  */
1428                         initdsmlocks(STATUSPTR(header));
1429                         memcpy(ptrcreate, header, tmpsize);
1430                         mhashInsert(oidmod[i], ptrcreate);
1431
1432                         offset += tmpsize;
1433 #endif
1434     }
1435                 else{
1436
1437     GETSIZE(tmpsize,header);
1438
1439     {
1440       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1441       struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
1442       dst->type=src->type;
1443       dst->___cachedCode___=src->___cachedCode___;
1444       dst->___cachedHash___=src->___cachedHash___;
1445       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1446     }
1447     header->version += 1;
1448 #ifdef DEBUG
1449     printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
1450 #endif
1451     /* If threads are waiting on this object to be updated, notify them */
1452     if(header->notifylist != NULL) {
1453 #ifdef DEBUG
1454       printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
1455 #endif
1456
1457 #ifdef RECOVERY
1458       if(header->isBackup != 0)
1459         notifyAll(&header->notifylist, OID(header), header->version);
1460       else
1461         clearNotifyList(OID(header));
1462 #else
1463         notifyAll(&header->notifylist, OID(header), header->version);
1464 #endif
1465
1466     }
1467     offset += sizeof(objheader_t) + tmpsize;
1468   }
1469 }
1470   if (nummod > 0)
1471     free(modptr);
1472
1473   /* Unlock locked objects */
1474   int useWriteUnlock = 0;
1475   for(i = 0; i < numlocked; i++) {
1476     if(oidlocked[i] == -1) {
1477       useWriteUnlock = 1;
1478       continue;
1479     }
1480     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1481       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1482       return 1;
1483     }
1484                 
1485 #ifdef DEBUG
1486                 printf("header oid:%d, version:%d, useWriteUnlock:%d\n", OID(header), header->version, useWriteUnlock);
1487 #endif
1488     if(useWriteUnlock) {
1489       write_unlock(STATUSPTR(header));
1490     } else {
1491       read_unlock(STATUSPTR(header));
1492     }
1493   }
1494   //TODO Update location lookup table
1495   return 0;
1496 }
1497
1498 /* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
1499  * Looks for the objects to be prefetched in the main object store.
1500  * If objects are not found then record those and if objects are found
1501  * then use offset values to prefetch references to other objects */
1502
1503 int prefetchReq(int acceptfd) {
1504   int i, size, objsize, numoffset = 0;
1505   int length;
1506   char *recvbuffer, control;
1507   unsigned int oid, mid=-1;
1508   objheader_t *header;
1509   oidmidpair_t oidmid;
1510   int sd = -1;
1511
1512   while(1) {
1513     recv_data((int)acceptfd, &numoffset, sizeof(int));
1514     if(numoffset == -1)
1515       break;
1516     recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
1517     oid = oidmid.oid;
1518     if (mid != oidmid.mid) {
1519       if (mid!=-1) {
1520         freeSockWithLock(transPResponseSocketPool, mid, sd);
1521       }
1522       mid=oidmid.mid;
1523       sd = getSockWithLock(transPResponseSocketPool, mid);
1524     }
1525     short offsetarry[numoffset];
1526     recv_data((int) acceptfd, offsetarry, numoffset*sizeof(short));
1527
1528     /*Process each oid */
1529     if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
1530       /* Save the oids not found in buffer for later use */
1531       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
1532       char sendbuffer[size];
1533       *((int *) sendbuffer) = size;
1534       *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
1535       *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
1536       control = TRANS_PREFETCH_RESPONSE;
1537       sendPrefetchResponse(sd, &control, sendbuffer, &size);
1538     } else { /* Object Found */
1539       int incr = 0;
1540       GETSIZE(objsize, header);
1541       size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1542       char sendbuffer[size];
1543       *((int *)(sendbuffer + incr)) = size;
1544       incr += sizeof(int);
1545       *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1546       incr += sizeof(char);
1547       *((unsigned int *)(sendbuffer+incr)) = oid;
1548       incr += sizeof(unsigned int);
1549       memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1550
1551       control = TRANS_PREFETCH_RESPONSE;
1552       sendPrefetchResponse(sd, &control, sendbuffer, &size);
1553
1554       /* Calculate the oid corresponding to the offset value */
1555       for(i = 0 ; i< numoffset ; i++) {
1556         /* Check for arrays  */
1557         if(TYPE(header) >= NUMCLASSES) {
1558           int elementsize = classsize[TYPE(header)];
1559           struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1560           unsigned short length = ao->___length___;
1561           /* Check if array out of bounds */
1562           if(offsetarry[i]< 0 || offsetarry[i] >= length) {
1563             break;
1564           }
1565           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
1566         } else {
1567           oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
1568         }
1569
1570         /* Don't continue if we hit a NULL pointer */
1571         if (oid==0)
1572           break;
1573
1574         if((header = mhashSearch(oid)) == NULL) {
1575           size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
1576           char sendbuffer[size];
1577           *((int *) sendbuffer) = size;
1578           *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
1579           *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
1580
1581           control = TRANS_PREFETCH_RESPONSE;
1582           sendPrefetchResponse(sd, &control, sendbuffer, &size);
1583           break;
1584         } else { /* Obj Found */
1585           int incr = 0;
1586           GETSIZE(objsize, header);
1587           size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
1588           char sendbuffer[size];
1589           *((int *)(sendbuffer + incr)) = size;
1590           incr += sizeof(int);
1591           *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
1592           incr += sizeof(char);
1593           *((unsigned int *)(sendbuffer+incr)) = oid;
1594           incr += sizeof(unsigned int);
1595           memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
1596
1597           control = TRANS_PREFETCH_RESPONSE;
1598           sendPrefetchResponse(sd, &control, sendbuffer, &size);
1599         }
1600       } //end of for
1601     }
1602   } //end of while
1603     //Release socket
1604   if (mid!=-1)
1605     freeSockWithLock(transPResponseSocketPool, mid, sd);
1606
1607   return 0;
1608 }
1609
1610 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
1611 #ifdef DEBUG
1612                 printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
1613 #endif
1614   send_data(sd, control, sizeof(char));
1615   /* Send the buffer with its size */
1616   int length = *(size);
1617   send_data(sd, sendbuffer, length);
1618 }
1619
1620 void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
1621   objheader_t *header;
1622   unsigned int oid;
1623   unsigned short newversion;
1624   char msg[1+  2 * sizeof(unsigned int) + sizeof(unsigned short)];
1625   int sd;
1626   struct sockaddr_in remoteAddr;
1627   int bytesSent;
1628   int size;
1629   int i = 0;
1630
1631   while(i < numoid) {
1632     oid = *(oidarry + i);
1633     if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1634       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1635       return;
1636     } else {
1637       /* Check to see if versions are same */
1638 checkversion:
1639       if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
1640         newversion = header->version;
1641           
1642         if(newversion == *(versionarry + i)) {
1643                 //Add to the notify list
1644               if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
1645                   printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
1646                 return;
1647               }
1648             write_unlock(STATUSPTR(header));
1649         } 
1650         else {
1651           write_unlock(STATUSPTR(header));
1652           if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1653                   perror("processReqNotify():socket()");
1654                   return;
1655             }
1656               bzero(&remoteAddr, sizeof(remoteAddr));
1657             remoteAddr.sin_family = AF_INET;
1658               remoteAddr.sin_port = htons(LISTEN_PORT);
1659             remoteAddr.sin_addr.s_addr = htonl(mid);
1660   
1661           if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1662                 printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
1663             inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1664               close(sd);
1665               return;
1666               } else {
1667                 //Send Update notification
1668               msg[0] = THREAD_NOTIFY_RESPONSE;
1669               *((unsigned int *)&msg[1]) = oid;
1670             size = sizeof(unsigned int);
1671                   *((unsigned short *)(&msg[1]+size)) = newversion;
1672             size += sizeof(unsigned short);
1673               *((unsigned int *)(&msg[1]+size)) = threadid;
1674               size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
1675             send_data(sd, msg, size);
1676                   }
1677           close(sd);
1678               }
1679       } else {
1680             randomdelay();
1681           goto checkversion;
1682       }
1683     }
1684     i++;
1685   }
1686   free(oidarry);
1687   free(versionarry);
1688 }
1689
1690 #ifdef RECOVERY
1691 /* go through oid's notifylist and clear them */
1692 void clearNotifyList(unsigned int oid)
1693 {
1694 #ifdef DEBUG
1695   printf("%s -> Entering\n",__func__);
1696 #endif
1697
1698   objheader_t* header;
1699   threadlist_t* t;
1700   threadlist_t* tmp;
1701   
1702   if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
1703     printf("%s -> mhashSearch returned NULL!!\n",__func__);
1704   }
1705
1706   if(header->notifylist != NULL) {
1707       t = header->notifylist;
1708        
1709       while(t) {
1710         tmp = t;
1711         t = t->next;
1712
1713         free(tmp);
1714       }
1715       header->notifylist = NULL;
1716   }
1717   
1718   pthread_mutex_lock(&clearNotifyList_mutex);
1719   clearNotifyListFlag = 0;
1720   pthread_mutex_unlock(&clearNotifyList_mutex);
1721 #ifdef DEBUG
1722   printf("%s -> finished\n",__func__);
1723 #endif
1724 }
1725 #endif